You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/06/15 17:51:03 UTC
[storm] branch master updated: Add missing @Override annotations
(help to detect inheritance issues when generics are used more intensively)
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new f7598f5 Add missing @Override annotations (help to detect inheritance issues when generics are used more intensively)
new 5d8731e Merge pull request #3020 from krichter722/override
f7598f5 is described below
commit f7598f5e9069c6b7e0d0825f7e82861c8011ec12
Author: Karl-Philipp Richter <kr...@aol.de>
AuthorDate: Mon Mar 9 02:26:30 2015 +0100
Add missing @Override annotations (help to detect inheritance issues when generics are used more intensively)
---
.../storm/hbase/topology/TotalWordCounter.java | 4 ++++
.../apache/storm/hbase/topology/WordCounter.java | 4 ++++
.../org/apache/storm/hbase/topology/WordSpout.java | 6 +++++
.../apache/storm/hdfs/bolt/HdfsFileTopology.java | 8 +++++++
.../storm/hdfs/bolt/SequenceFileTopology.java | 8 +++++++
.../storm/hive/bolt/BucketTestHiveTopology.java | 5 ++++
.../org/apache/storm/hive/bolt/HiveTopology.java | 5 ++++
.../storm/hive/bolt/HiveTopologyPartitioned.java | 5 ++++
.../org/apache/storm/jdbc/spout/UserSpout.java | 6 +++++
.../org/apache/storm/jms/example/GenericBolt.java | 4 ++++
.../storm/jms/example/JsonTupleProducer.java | 2 ++
.../storm/jms/example/SpringJmsProvider.java | 2 ++
.../KafkaSpoutTopologyMainWildcardTopicsLocal.java | 1 +
.../storm/loadgen/HttpForwardingMetricsServer.java | 1 +
.../storm/mongodb/topology/TotalWordCounter.java | 4 ++++
.../apache/storm/mongodb/topology/WordCounter.java | 4 ++++
.../apache/storm/mongodb/topology/WordSpout.java | 6 +++++
.../apache/storm/perf/StrGenSpoutHdfsBoltTopo.java | 1 +
.../java/org/apache/storm/perf/utils/Helper.java | 1 +
.../apache/storm/redis/topology/WordCounter.java | 4 ++++
.../org/apache/storm/redis/topology/WordSpout.java | 6 +++++
.../storm/solr/topology/SolrFieldsTopology.java | 2 ++
.../storm/solr/topology/SolrJsonTopology.java | 1 +
.../solr/trident/SolrFieldsTridentTopology.java | 1 +
.../solr/trident/SolrJsonTridentTopology.java | 1 +
.../org/apache/storm/starter/RollingTopWords.java | 1 +
.../storm/starter/SkewedRollingTopWords.java | 1 +
.../apache/storm/starter/WordCountTopology.java | 1 +
.../storm/starter/spout/RandomSentenceSpout.java | 1 +
.../starter/tools/RankableObjectWithFields.java | 3 +++
.../org/apache/storm/starter/tools/Rankings.java | 1 +
.../storm/starter/trident/DebugMemoryMapState.java | 1 +
.../apache/storm/starter/tools/RankingsTest.java | 1 +
.../executor/impl/BatchAsyncResultHandler.java | 2 ++
.../executor/impl/SingleAsyncResultHandler.java | 2 ++
.../ObjectMapperCqlStatementMapperBuilder.java | 1 +
.../storm/hbase/trident/state/HBaseMapState.java | 1 +
.../windowing/HBaseWindowsStoreFactory.java | 1 +
.../hdfs/blobstore/HdfsBlobStoreImplTest.java | 1 +
.../apache/storm/hdfs/bolt/AbstractHdfsBolt.java | 1 +
.../hdfs/bolt/format/DefaultFileNameFormat.java | 1 +
.../storm/hdfs/common/AbstractHDFSWriter.java | 5 ++++
.../java/org/apache/storm/hdfs/spout/FileLock.java | 1 +
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 2 ++
.../storm/hdfs/spout/SequenceFileReader.java | 2 ++
.../apache/storm/hdfs/spout/TextFileReader.java | 2 ++
.../hdfs/trident/format/DefaultFileNameFormat.java | 1 +
.../org/apache/storm/hdfs/spout/TestHdfsSpout.java | 1 +
.../java/org/apache/storm/jms/spout/JmsSpout.java | 2 ++
.../apache/storm/jms/spout/MockJmsProvider.java | 2 ++
.../apache/storm/kafka/spout/KafkaSpoutConfig.java | 1 +
.../apache/storm/kafka/spout/RecordTranslator.java | 1 +
.../common/mapper/SimpleMongoUpdateMapper.java | 1 +
.../storm/mongodb/trident/state/MongoMapState.java | 1 +
.../storm/mqtt/mappers/ByteArrayMessageMapper.java | 2 ++
.../storm/mqtt/mappers/StringMessageMapper.java | 2 ++
.../org/apache/storm/mqtt/spout/MqttSpout.java | 19 +++++++++++++++
.../bolt/TupleOpenTsdbDatapointMapper.java | 1 +
.../apache/storm/flux/examples/WordCounter.java | 4 ++++
pom.xml | 26 ++++++++++++++++++++
.../sql/compiler/RexNodeToJavaCodeCompiler.java | 1 +
.../datasource/socket/spout/SocketSpout.java | 1 +
.../src/main/resources/storm/pmd-ruleset.xml | 28 ++++++++++++++++++++++
.../src/jvm/org/apache/storm/ILocalDRPC.java | 1 +
.../src/jvm/org/apache/storm/LogWriter.java | 1 +
.../assignments/ILocalAssignmentsBackend.java | 1 +
.../apache/storm/blobstore/ClientBlobStore.java | 1 +
.../apache/storm/coordination/CoordinatedBolt.java | 11 +++++++++
.../storm/daemon/supervisor/AdvancedFSOps.java | 19 +++++++++++++++
.../daemon/supervisor/ClientSupervisorUtils.java | 1 +
.../apache/storm/drpc/DRPCInvocationsClient.java | 3 +++
.../src/jvm/org/apache/storm/drpc/JoinResult.java | 3 +++
.../jvm/org/apache/storm/drpc/KeyedFairBolt.java | 7 +++++-
.../jvm/org/apache/storm/drpc/PrepareRequest.java | 1 +
.../jvm/org/apache/storm/drpc/ReturnResults.java | 1 +
.../apache/storm/executor/ExecutorShutdown.java | 1 +
.../executor/bolt/BoltOutputCollectorImpl.java | 1 +
.../apache/storm/generated/NumErrorsChoice.java | 1 +
.../storm/generated/TopologyInitialStatus.java | 1 +
.../src/jvm/org/apache/storm/grouping/Load.java | 1 +
.../apache/storm/grouping/PartialKeyGrouping.java | 2 ++
.../org/apache/storm/messaging/netty/Client.java | 1 +
.../messaging/netty/KerberosSaslNettyClient.java | 3 +++
.../messaging/netty/KerberosSaslNettyServer.java | 2 ++
.../org/apache/storm/messaging/netty/Login.java | 1 +
.../storm/messaging/netty/SaslNettyClient.java | 1 +
.../apache/storm/metric/api/AssignableMetric.java | 1 +
.../apache/storm/metric/api/CombinedMetric.java | 1 +
.../org/apache/storm/metric/api/CountMetric.java | 1 +
.../org/apache/storm/metric/api/MeanReducer.java | 3 +++
.../apache/storm/metric/api/MultiCountMetric.java | 1 +
.../storm/metric/api/MultiReducedMetric.java | 1 +
.../org/apache/storm/metric/api/ReducedMetric.java | 1 +
.../metric/api/rpc/AssignableShellMetric.java | 1 +
.../storm/metric/api/rpc/CombinedShellMetric.java | 1 +
.../storm/metric/api/rpc/CountShellMetric.java | 1 +
.../storm/metric/api/rpc/ReducedShellMetric.java | 1 +
.../storm/metric/internal/CountStatAndMetric.java | 1 +
.../metric/internal/LatencyStatAndMetric.java | 1 +
.../apache/storm/metric/internal/RateTracker.java | 1 +
.../org/apache/storm/multilang/JsonSerializer.java | 6 +++++
.../apache/storm/pacemaker/PacemakerClient.java | 3 +++
.../org/apache/storm/security/auth/AutoSSL.java | 1 +
.../security/auth/DefaultPrincipalToLocal.java | 1 +
.../security/auth/KerberosPrincipalToLocal.java | 1 +
.../storm/security/auth/SimpleTransportPlugin.java | 3 +++
.../security/auth/authorizer/DenyAuthorizer.java | 2 ++
.../security/auth/authorizer/NoopAuthorizer.java | 2 ++
.../auth/digest/DigestSaslTransportPlugin.java | 1 +
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 5 ++++
.../auth/kerberos/ClientCallbackHandler.java | 1 +
.../auth/kerberos/KerberosSaslTransportPlugin.java | 1 +
.../auth/kerberos/ServerCallbackHandler.java | 1 +
.../security/auth/sasl/SaslTransportPlugin.java | 2 ++
.../storm/serialization/DefaultKryoFactory.java | 1 +
.../storm/serialization/KryoTupleSerializer.java | 1 +
.../src/jvm/org/apache/storm/spout/RawScheme.java | 2 ++
.../src/jvm/org/apache/storm/spout/ShellSpout.java | 5 ++++
.../apache/storm/spout/SpoutOutputCollector.java | 2 ++
.../storm/state/BaseBinaryStateIterator.java | 5 ++++
.../apache/storm/state/DefaultStateEncoder.java | 4 ++++
.../apache/storm/streams/operations/Reducer.java | 1 +
.../src/jvm/org/apache/storm/task/ShellBolt.java | 5 ++++
.../jvm/org/apache/storm/task/TopologyContext.java | 3 +++
.../jvm/org/apache/storm/testing/BoltTracker.java | 1 +
.../jvm/org/apache/storm/testing/FeederSpout.java | 6 +++++
.../org/apache/storm/testing/FixedTupleSpout.java | 5 ++++
.../apache/storm/testing/NonRichBoltTracker.java | 3 +++
.../storm/testing/PythonShellMetricsBolt.java | 3 +++
.../storm/testing/PythonShellMetricsSpout.java | 2 ++
.../storm/testing/SingleUserSimpleTransport.java | 2 ++
.../jvm/org/apache/storm/testing/SpoutTracker.java | 8 +++++++
.../storm/testing/TestAggregatesCounter.java | 4 ++++
.../apache/storm/testing/TestEventLogSpout.java | 6 +++++
.../storm/testing/TestEventOrderCheckBolt.java | 4 ++++
.../org/apache/storm/testing/TestGlobalCount.java | 4 ++++
.../apache/storm/testing/TestKryoDecorator.java | 1 +
.../org/apache/storm/testing/TestPlannerBolt.java | 3 +++
.../org/apache/storm/testing/TestPlannerSpout.java | 6 +++++
.../org/apache/storm/testing/TestWordCounter.java | 4 ++++
.../org/apache/storm/testing/TestWordSpout.java | 6 +++++
.../org/apache/storm/testing/TupleCaptureBolt.java | 3 +++
.../apache/storm/topology/BasicBoltExecutor.java | 5 ++++
.../storm/topology/BasicOutputCollector.java | 4 ++++
.../storm/topology/CheckpointTupleForwarder.java | 2 ++
.../apache/storm/topology/OutputFieldsGetter.java | 4 ++++
.../trident/fluent/ChainedAggregatorDeclarer.java | 13 ++++++++++
.../trident/operation/TridentOperationContext.java | 3 +++
.../operation/impl/ChainedAggregatorImpl.java | 5 ++++
.../impl/CombinerAggregatorCombineImpl.java | 5 ++++
.../operation/impl/ReducerAggregatorImpl.java | 5 ++++
.../storm/trident/testing/FeederBatchSpout.java | 1 +
.../storm/trident/testing/LRUMemoryMapState.java | 12 ++++++++++
.../storm/trident/testing/MemoryMapState.java | 12 ++++++++++
.../storm/trident/topology/TransactionAttempt.java | 2 ++
.../trident/topology/TridentBoltExecutor.java | 6 +++++
.../windowing/AbstractTridentWindowManager.java | 2 ++
.../windowing/InMemoryTridentWindowManager.java | 1 +
.../windowing/StoreBasedTridentWindowManager.java | 4 ++++
.../trident/windowing/config/BaseWindowConfig.java | 1 +
.../src/jvm/org/apache/storm/tuple/Fields.java | 1 +
.../src/jvm/org/apache/storm/utils/DRPCClient.java | 1 +
.../apache/storm/utils/DefaultShellLogHandler.java | 2 ++
.../jvm/org/apache/storm/utils/TimeCacheMap.java | 1 +
.../windowing/WatermarkCountEvictionPolicy.java | 1 +
.../storm/PaceMakerStateStorageFactoryTest.java | 2 ++
.../jvm/org/apache/storm/bolt/TestJoinBolt.java | 2 ++
.../storm/dependency/DependencyUploaderTest.java | 2 ++
.../org/apache/storm/clojure/ClojureTuple.java | 4 ++++
.../apache/storm/clojure/IndifferentAccessMap.java | 22 +++++++++++++++++
.../localizer/LocalizedResourceRetentionSet.java | 1 +
.../storm/logging/filters/AccessLoggingFilter.java | 3 +++
.../java/org/apache/storm/metricstore/Metric.java | 1 +
.../org/apache/storm/metricstore/MetricStore.java | 1 +
.../storm/metricstore/rocksdb/RocksDbStore.java | 2 ++
.../metricstore/rocksdb/StringMetadataCache.java | 6 +++++
.../org/apache/storm/scheduler/resource/User.java | 1 +
.../blacklist/TestUtilsForBlacklistScheduler.java | 6 +++++
.../TestUtilsForResourceAwareScheduler.java | 6 +++++
.../storm/daemon/drpc/webapp/ReqContextFilter.java | 5 +++-
.../exceptionmappers/NotAliveExceptionMapper.java | 1 +
181 files changed, 585 insertions(+), 2 deletions(-)
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
index 313340a..a1017df 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/TotalWordCounter.java
@@ -32,6 +32,7 @@ public class TotalWordCounter implements IBasicBolt {
private static final Random RANDOM = new Random();
private BigInteger total = BigInteger.ZERO;
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
@@ -39,6 +40,7 @@ public class TotalWordCounter implements IBasicBolt {
* Just output the word value with a count of 1.
* The HBaseBolt will handle incrementing the counter.
*/
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
total = total.add(new BigInteger(input.getValues().get(1).toString()));
collector.emit(tuple(total.toString()));
@@ -48,10 +50,12 @@ public class TotalWordCounter implements IBasicBolt {
}
}
+ @Override
public void cleanup() {
LOG.info("Final total = " + total);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("total"));
}
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
index 933cc49..699b2ca 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCounter.java
@@ -25,6 +25,7 @@ import static org.apache.storm.utils.Utils.tuple;
public class WordCounter implements IBasicBolt {
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
@@ -32,14 +33,17 @@ public class WordCounter implements IBasicBolt {
* Just output the word value with a count of 1.
* The HBaseBolt will handle incrementing the counter.
*/
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
collector.emit(tuple(input.getValues().get(0), 1));
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
index 3e407d6..a80e80a 100644
--- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
+++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordSpout.java
@@ -39,14 +39,17 @@ public class WordSpout implements IRichSpout {
return this.isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
@@ -54,14 +57,17 @@ public class WordSpout implements IRichSpout {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
index e3599d5..e0e3057 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/HdfsFileTopology.java
@@ -118,16 +118,19 @@ public class HdfsFileTopology {
private int count = 0;
private long total = 0L;
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}
+ @Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
+ @Override
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
@@ -146,10 +149,12 @@ public class HdfsFileTopology {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
+ @Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
@@ -161,15 +166,18 @@ public class HdfsFileTopology {
private HashMap<String, Long> counts = null;
private OutputCollector collector;
+ @Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}
+ @Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
index 8ddd045..2a58249 100644
--- a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileTopology.java
@@ -124,16 +124,19 @@ public class SequenceFileTopology {
private int count = 0;
private long total = 0L;
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}
+ @Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
+ @Override
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
@@ -152,11 +155,13 @@ public class SequenceFileTopology {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
// System.out.println("ACK");
this.pending.remove(msgId);
}
+ @Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
@@ -169,16 +174,19 @@ public class SequenceFileTopology {
private HashMap<String, Long> counts = null;
private OutputCollector collector;
+ @Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}
+ @Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
index 19bf4d4..85bbde7 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java
@@ -119,10 +119,12 @@ public class BucketTestHiveTopology {
return this;
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(this.outputFields));
}
+ @Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
@@ -135,6 +137,7 @@ public class BucketTestHiveTopology {
}
}
+ @Override
public void nextTuple() {
String line;
try {
@@ -161,10 +164,12 @@ public class BucketTestHiveTopology {
}
}
+ @Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
+ @Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
index d9d4ea9..5dbb036 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -100,16 +100,19 @@ public class HiveTopology {
private int count = 0;
private long total = 0L;
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","name","phone","street","city","state"));
}
+ @Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
+ @Override
public void nextTuple() {
String[] user = sentences[index].split(",");
Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
@@ -129,10 +132,12 @@ public class HiveTopology {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
+ @Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
diff --git a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
index 4d8089d..6af50de 100644
--- a/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
+++ b/examples/storm-hive-examples/src/main/java/org/apache/storm/hive/bolt/HiveTopologyPartitioned.java
@@ -101,16 +101,19 @@ public class HiveTopologyPartitioned {
private int count = 0;
private long total = 0L;
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","name","phone","street","city","state"));
}
+ @Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}
+ @Override
public void nextTuple() {
String[] user = sentences[index].split(",");
Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
@@ -130,10 +133,12 @@ public class HiveTopologyPartitioned {
}
}
+ @Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
+ @Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
diff --git a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
index 9641599..06f107c 100644
--- a/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -47,14 +47,17 @@ public class UserSpout implements IRichSpout {
return this.isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
final Random rand = new Random();
final Values row = rows.get(rand.nextInt(rows.size() - 1));
@@ -62,14 +65,17 @@ public class UserSpout implements IRichSpout {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user_id","user_name","create_date"));
}
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
index 0b077c0..341a4dc 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
@@ -67,12 +67,14 @@ public class GenericBolt extends BaseRichBolt {
this(name, autoAck, autoAnchor, null);
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
+ @Override
public void execute(Tuple input) {
LOG.debug("[" + this.name + "] Received message: " + input);
@@ -94,10 +96,12 @@ public class GenericBolt extends BaseRichBolt {
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
if (this.declaredFields != null) {
declarer.declare(this.declaredFields);
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
index 9ee175e..9f88570 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
@@ -42,6 +42,7 @@ import org.apache.storm.tuple.Values;
@SuppressWarnings("serial")
public class JsonTupleProducer implements JmsTupleProducer {
+ @Override
public Values toTuple(Message msg) throws JMSException {
if(msg instanceof TextMessage){
String json = ((TextMessage) msg).getText();
@@ -51,6 +52,7 @@ public class JsonTupleProducer implements JmsTupleProducer {
}
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
index 306fc25..747b415 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
@@ -63,10 +63,12 @@ public class SpringJmsProvider implements JmsProvider {
this.destination = (Destination)context.getBean(destinationBean);
}
+ @Override
public ConnectionFactory connectionFactory() throws Exception {
return this.connectionFactory;
}
+ @Override
public Destination destination() throws Exception {
return this.destination;
}
diff --git a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopicsLocal.java b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopicsLocal.java
index d7b8cd9..cf70bb3 100644
--- a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopicsLocal.java
+++ b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopicsLocal.java
@@ -22,6 +22,7 @@ public class KafkaSpoutTopologyMainWildcardTopicsLocal extends KafkaSpoutTopolog
new KafkaSpoutTopologyMainWildcardTopicsLocal().runExample();
}
+ @Override
protected KafkaSpoutTopologyMainNamedTopics getTopology() {
return new KafkaSpoutTopologyMainWildcardTopics();
}
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
index ca8c501..aa3e89f 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -54,6 +54,7 @@ public abstract class HttpForwardingMetricsServer {
};
private class MetricsCollectionServlet extends HttpServlet {
+ @Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Input in = new Input(request.getInputStream());
List<Object> metrics = des.get().deserializeFrom(in);
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
index 4dee7a0..b5d20cb 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
@@ -37,12 +37,14 @@ public class TotalWordCounter implements IBasicBolt {
private BigInteger total = BigInteger.ZERO;
private static final Logger LOG = LoggerFactory.getLogger(TotalWordCounter.class);
private static final Random RANDOM = new Random();
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
/*
* Just output the word value with a count of 1.
*/
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
total = total.add(new BigInteger(input.getValues().get(1).toString()));
collector.emit(tuple(total.toString()));
@@ -52,10 +54,12 @@ public class TotalWordCounter implements IBasicBolt {
}
}
+ @Override
public void cleanup() {
LOG.info("Final total = " + total);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("total"));
}
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
index 4054274..ec96346 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
@@ -31,10 +31,12 @@ import java.util.Map;
public class WordCounter implements IBasicBolt {
private Map<String, Integer> wordCounter = Maps.newHashMap();
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getStringByField("word");
int count;
@@ -49,10 +51,12 @@ public class WordCounter implements IBasicBolt {
collector.emit(new Values(word, String.valueOf(count)));
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
index f0bdde2..aa14373 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
@@ -45,14 +45,17 @@ public class WordSpout implements IRichSpout {
return this.isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
@@ -60,14 +63,17 @@ public class WordSpout implements IRichSpout {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index 110bf79..fa0d90b 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -153,6 +153,7 @@ public class StrGenSpoutHdfsBoltTopo {
return this;
}
+ @Override
public byte[] format(Tuple tuple) {
return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index a0f838e..b3a206c 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -74,6 +74,7 @@ public class Helper {
Map<String, Object> clusterConf = Utils.readStormConfig();
final Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
try {
System.out.println("Killing...");
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
index c321b83..8595d1e 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordCounter.java
@@ -31,9 +31,11 @@ import java.util.Map;
public class WordCounter implements IBasicBolt {
private Map<String, Integer> wordCounter = Maps.newHashMap();
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getStringByField("word");
int count;
@@ -48,10 +50,12 @@ public class WordCounter implements IBasicBolt {
collector.emit(new Values(word, String.valueOf(count)));
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
index ef45cd2..a387762 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WordSpout.java
@@ -45,14 +45,17 @@ public class WordSpout implements IRichSpout {
return this.isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
@@ -60,14 +63,17 @@ public class WordSpout implements IRichSpout {
Thread.yield();
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrFieldsTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
index 030da1a..0157cfc 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrFieldsTopology.java
@@ -43,10 +43,12 @@ public class SolrFieldsTopology extends SolrTopology {
.setMultiValueFieldToken("%").build();
}
+ @Override
protected SolrCommitStrategy getSolrCommitStgy() {
return new CountBasedCommit(2); // To Commit to Solr and Ack according to the commit strategy
}
+ @Override
protected StormTopology getTopology() throws IOException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("SolrFieldsSpout", new SolrFieldsSpout());
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrJsonTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrJsonTopology.java
index 24e6b5e..d76ffb7 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrJsonTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/topology/SolrJsonTopology.java
@@ -38,6 +38,7 @@ public class SolrJsonTopology extends SolrTopology {
return new SolrJsonMapper.Builder(COLLECTION, jsonTupleField).build();
}
+ @Override
protected StormTopology getTopology() throws IOException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("SolrJsonSpout", new SolrJsonSpout());
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
index ca64be0..2a230f6 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrFieldsTridentTopology.java
@@ -35,6 +35,7 @@ public class SolrFieldsTridentTopology extends SolrFieldsTopology {
solrFieldsTridentTopology.run(args);
}
+ @Override
protected StormTopology getTopology() throws IOException {
final TridentTopology tridentTopology = new TridentTopology();
final SolrFieldsSpout spout = new SolrFieldsSpout();
diff --git a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
index 75131b8..ae3e088 100644
--- a/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
+++ b/examples/storm-solr-examples/src/main/java/org/apache/storm/solr/trident/SolrJsonTridentTopology.java
@@ -34,6 +34,7 @@ public class SolrJsonTridentTopology extends SolrJsonTopology {
solrJsonTridentTopology.run(args);
}
+ @Override
protected StormTopology getTopology() throws IOException {
final TridentTopology topology = new TridentTopology();
final SolrJsonSpout spout = new SolrJsonSpout();
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
index 23f56c2..f8ad8c2 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -61,6 +61,7 @@ public class RollingTopWords extends ConfigurableTopology {
* locally ("-local") or remotely, i.e. on a real cluster.
* @throws Exception
*/
+ @Override
protected int run(String[] args) {
String topologyName = "slidingWindowCounts";
if (args.length >= 1) {
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
index a3b8296..a173854 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -64,6 +64,7 @@ public class SkewedRollingTopWords extends ConfigurableTopology {
* locally ("-local") or remotely, i.e. on a real cluster.
* @throws Exception
*/
+ @Override
protected int run(String[] args) {
String topologyName = "slidingWindowCounts";
if (args.length >= 1) {
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index 32ec822..628aad7 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -35,6 +35,7 @@ public class WordCountTopology extends ConfigurableTopology {
ConfigurableTopology.start(new WordCountTopology(), args);
}
+ @Override
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
index e3e0a68..f6067ae 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
@@ -82,6 +82,7 @@ public class RandomSentenceSpout extends BaseRichSpout {
this.prefix = prefix;
}
+ @Override
protected String sentence(String input) {
return prefix + currentDate() + " " + input;
}
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
index 0b3808e..48bc261 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/RankableObjectWithFields.java
@@ -63,10 +63,12 @@ public class RankableObjectWithFields implements Rankable, Serializable {
return new RankableObjectWithFields(obj, count, otherFields.toArray());
}
+ @Override
public Object getObject() {
return obj;
}
+ @Override
public long getCount() {
return count;
}
@@ -111,6 +113,7 @@ public class RankableObjectWithFields implements Rankable, Serializable {
return result;
}
+ @Override
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("[");
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
index eae6ccd..85a123b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/tools/Rankings.java
@@ -135,6 +135,7 @@ public class Rankings implements Serializable {
}
}
+ @Override
public String toString() {
return rankedItems.toString();
}
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
index 4849f72..ceac936 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
@@ -40,6 +40,7 @@ public class DebugMemoryMapState<T> extends MemoryMapState<T> {
super(id);
}
+ @Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
print(keys, updaters);
if ((updateCount++ % 5) == 0) {
diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
index 8ee4f22..206d45c 100644
--- a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
+++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/RankingsTest.java
@@ -341,6 +341,7 @@ public class RankingsTest {
// when
blitzer.blitz(new Runnable() {
+ @Override
public void run() {
for (Rankable r : entries) {
try {
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
index 99a27ef..27139fa 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java
@@ -41,6 +41,7 @@ public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>>
*
* The default method does no-operation.
*/
+ @Override
public void failure(Throwable t, List<Tuple> input) {
completed.offer(new ExecutionResultCollector.FailedCollector(input, t));
}
@@ -50,6 +51,7 @@ public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>>
*
* The default method does no-operation.
*/
+ @Override
public void success(List<Tuple> input) {
completed.offer(new ExecutionResultCollector.SucceedCollector(input));
}
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
index d7cc19b..127a06d 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java
@@ -40,6 +40,7 @@ public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> {
*
* The default method does no-operation.
*/
+ @Override
public void failure(Throwable t, Tuple input) {
completed.offer(new ExecutionResultCollector.FailedCollector(input, t));
}
@@ -49,6 +50,7 @@ public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> {
*
* The default method does no-operation.
*/
+ @Override
public void success(Tuple input) {
completed.offer(new ExecutionResultCollector.SucceedCollector(input));
}
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java
index 2044f2d..d7961d8 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java
@@ -47,6 +47,7 @@ public class ObjectMapperCqlStatementMapperBuilder implements CQLStatementBuilde
/**
* Builds an ObjectMapperCqlStatementMapper.
*/
+ @Override
public ObjectMapperCqlStatementMapper build() {
List<TypeCodec<?>> codecs = codecProducers.stream().map(codecProducer -> {
try {
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index 63169fb..191d7cb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -234,6 +234,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions);
IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index 9981811..c4030b2 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -34,6 +34,7 @@ public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
this.qualifier = qualifier;
}
+ @Override
public WindowsStore create(Map<String, Object> topoConf) {
Configuration configuration = HBaseConfiguration.create();
for (Map.Entry<String, Object> entry : config.entrySet()) {
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
index 752e563..0d44f05 100644
--- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -64,6 +64,7 @@ public class HdfsBlobStoreImplTest {
super(path, conf, hconf);
}
+ @Override
protected Path getKeyDir(String key) {
return new Path(new Path(blobDir, KEYDIR), key);
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index bcff104..156e58a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -91,6 +91,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
* @param topologyContext
* @param collector
*/
+ @Override
public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) {
this.writeLock = new Object();
if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
index 14b4513..42c1f5b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
@@ -74,6 +74,7 @@ public class DefaultFileNameFormat implements FileNameFormat {
return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension;
}
+ @Override
public String getPath() {
return this.path;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
index d4a95ee..6c2b0e0 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -31,6 +31,7 @@ abstract public class AbstractHDFSWriter implements Writer {
this.filePath = path;
}
+ @Override
final public long write(Tuple tuple) throws IOException {
doWrite(tuple);
this.needsRotation = rotationPolicy.mark(tuple, offset);
@@ -38,18 +39,22 @@ abstract public class AbstractHDFSWriter implements Writer {
return this.offset;
}
+ @Override
final public void sync() throws IOException {
doSync();
}
+ @Override
final public void close() throws IOException {
doClose();
}
+ @Override
public boolean needsRotation() {
return needsRotation;
}
+ @Override
public Path getFilePath() {
return this.filePath;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index c5a2f55..c6529a9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -302,6 +302,7 @@ public class FileLock {
return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]);
}
+ @Override
public String toString() {
return eventTime + "," + componentID + "," + fileOffset;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index bdf9da3..18c469f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -230,6 +230,7 @@ public class HdfsSpout extends BaseRichSpout {
return collector;
}
+ @Override
public void nextTuple() {
LOG.trace("Next Tuple {}", spoutId);
// 1) First re-emit any previously failed tuples (from retryList)
@@ -387,6 +388,7 @@ public class HdfsSpout extends BaseRichSpout {
}
@SuppressWarnings("deprecation")
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
LOG.info("Opening HDFS Spout");
this.conf = conf;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index ab61c2b..39dccae 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -69,6 +69,7 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable>
}
}
+ @Override
public List<Object> next() throws IOException, ParseException {
if (reader.next(key, value)) {
ArrayList<Object> result = new ArrayList<Object>(2);
@@ -88,6 +89,7 @@ public class SequenceFileReader<Key extends Writable, Value extends Writable>
}
}
+ @Override
public Offset getFileOffset() {
return offset;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index cc5531e..94b40f3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -59,10 +59,12 @@ public class TextFileReader extends AbstractFileReader {
}
+ @Override
public Offset getFileOffset() {
return offset.clone();
}
+ @Override
public List<Object> next() throws IOException, ParseException {
String line = readLineAndTrackOffset(reader);
if (line != null) {
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
index 825a0f0..e97bf1c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
@@ -72,6 +72,7 @@ public class DefaultFileNameFormat implements FileNameFormat {
return this.prefix + "-" + this.partitionIndex + "-" + rotation + "-" + timeStamp + this.extension;
}
+ @Override
public String getPath() {
return this.path;
}
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 97ad8f2..c3e89dc 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -776,6 +776,7 @@ public class TestHdfsSpout {
this.componentId = componentId;
}
+ @Override
public String getThisComponentId() {
return Integer.toString(componentId);
}
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 96226d6..1980a96 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -255,6 +255,7 @@ public class JmsSpout extends BaseRichSpout {
* <p>When overridden, should always call {@code super}
* to finalize the active connections.
*/
+ @Override
public void close() {
try {
LOG.debug("Closing JMS connection.");
@@ -272,6 +273,7 @@ public class JmsSpout extends BaseRichSpout {
* <p>This method polls the queue that's being filled asynchronously by the
* jms connection, every {@link #POLL_INTERVAL_MS} seconds.
*/
+ @Override
public void nextTuple() {
try {
Message msg = consumer.receive(POLL_INTERVAL_MS);
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
index ca9733d..c0a8790 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
@@ -46,6 +46,7 @@ public class MockJmsProvider implements JmsProvider {
*
* @throws Exception
*/
+ @Override
public ConnectionFactory connectionFactory() throws Exception {
return this.connectionFactory;
}
@@ -58,6 +59,7 @@ public class MockJmsProvider implements JmsProvider {
*
* @throws Exception
*/
+ @Override
public Destination destination() throws Exception {
return this.destination;
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index a51e14c..1801037 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -293,6 +293,7 @@ public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> {
return this;
}
+ @Override
public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
index 6361c90..12263bd 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -39,6 +39,7 @@ public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecor
* Return {@code null} to discard an invalid {@link ConsumerRecord}
* if {@link Builder#setEmitNullTuples(boolean)} is set to {@code false}.
*/
+ @Override
List<Object> apply(ConsumerRecord<K,V> record);
/**
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
index 72328c9..1c9cd41 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
@@ -39,6 +39,7 @@ public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements MongoU
return new Document("$set", document);
}
+ @Override
public SimpleMongoUpdateMapper withFields(String... fields) {
this.fields = fields;
return this;
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
index b06d35d..0a3b100 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
@@ -147,6 +147,7 @@ public class MongoMapState<T> implements IBackingMap<T> {
}
@SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
IBackingMap state = new MongoMapState(conf, options);
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
index 1dd943d..a943f86 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
@@ -19,10 +19,12 @@ import org.apache.storm.tuple.Values;
public class ByteArrayMessageMapper implements MqttMessageMapper {
+ @Override
public Values toValues(MqttMessage message) {
return new Values(message.getTopic(), message.getMessage());
}
+ @Override
public Fields outputFields() {
return new Fields("topic", "message");
}
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
index 341e52d..3f76b61 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
@@ -22,10 +22,12 @@ import org.apache.storm.tuple.Values;
* "topic" and "message", both of which are Strings.
*/
public class StringMessageMapper implements MqttMessageMapper {
+ @Override
public Values toValues(MqttMessage message) {
return new Values(message.getTopic(), new String(message.getMessage()));
}
+ @Override
public Fields outputFields() {
return new Fields("topic", "message");
}
diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
index 268a695..1e8cf46 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
@@ -76,14 +76,17 @@ public class MqttSpout implements IRichSpout, Listener {
return this.sequence;
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(this.type.outputFields());
}
+ @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
@@ -129,13 +132,16 @@ public class MqttSpout implements IRichSpout, Listener {
}
+ @Override
public void close() {
this.connection.disconnect(new DisconnectCallback());
}
+ @Override
public void activate() {
}
+ @Override
public void deactivate() {
}
@@ -147,6 +153,7 @@ public class MqttSpout implements IRichSpout, Listener {
* to have nextTuple sleep for a short amount of time (like a single millisecond)
* so as not to waste too much CPU.
*/
+ @Override
public void nextTuple() {
AckableMessage tm = this.incoming.poll();
if (tm != null) {
@@ -166,6 +173,7 @@ public class MqttSpout implements IRichSpout, Listener {
*
* @param msgId
*/
+ @Override
public void ack(Object msgId) {
AckableMessage msg = this.pending.remove(msgId);
this.connection.getDispatchQueue().execute(msg.ack());
@@ -178,6 +186,7 @@ public class MqttSpout implements IRichSpout, Listener {
*
* @param msgId
*/
+ @Override
public void fail(Object msgId) {
try {
this.incoming.put(this.pending.remove(msgId));
@@ -188,14 +197,17 @@ public class MqttSpout implements IRichSpout, Listener {
// ################# Listener Implementation ######################
+ @Override
public void onConnected() {
// this gets called repeatedly for no apparent reason, don't do anything
}
+ @Override
public void onDisconnected() {
// this gets called repeatedly for no apparent reason, don't do anything
}
+ @Override
public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray()));
try {
@@ -205,6 +217,7 @@ public class MqttSpout implements IRichSpout, Listener {
}
}
+ @Override
public void onFailure(Throwable throwable) {
LOG.error("MQTT Connection Failure.", throwable);
MqttSpout.this.connection.disconnect(new DisconnectCallback());
@@ -213,11 +226,13 @@ public class MqttSpout implements IRichSpout, Listener {
// ################# Connect Callback Implementation ######################
private class ConnectCallback implements Callback<Void> {
+ @Override
public void onSuccess(Void v) {
LOG.info("MQTT Connected. Subscribing to topic...");
MqttSpout.this.mqttConnected = true;
}
+ @Override
public void onFailure(Throwable throwable) {
LOG.info("MQTT Connection failed.");
MqttSpout.this.mqttConnectFailed = true;
@@ -226,10 +241,12 @@ public class MqttSpout implements IRichSpout, Listener {
// ################# Subscribe Callback Implementation ######################
private class SubscribeCallback implements Callback<byte[]> {
+ @Override
public void onSuccess(byte[] qos) {
LOG.info("Subscripton sucessful.");
}
+ @Override
public void onFailure(Throwable throwable) {
LOG.error("MQTT Subscripton failed.", throwable);
throw new RuntimeException("MQTT Subscribe failed.", throwable);
@@ -238,10 +255,12 @@ public class MqttSpout implements IRichSpout, Listener {
// ################# Subscribe Callback Implementation ######################
private class DisconnectCallback implements Callback<Void> {
+ @Override
public void onSuccess(Void aVoid) {
LOG.info("MQTT Disconnect successful.");
}
+ @Override
public void onFailure(Throwable throwable) {
// Disconnects don't fail.
}
diff --git a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
index 8953535..53a49a9 100644
--- a/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
+++ b/external/storm-opentsdb/src/main/java/org/apache/storm/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java
@@ -86,6 +86,7 @@ public final class TupleOpenTsdbDatapointMapper implements ITupleOpenTsdbDatapoi
return tagsField;
}
+ @Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TupleOpenTsdbDatapointMapper)) return false;
diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
index 709c254..1604b79 100644
--- a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
+++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
@@ -43,6 +43,7 @@ public class WordCounter extends BaseBasicBolt {
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}
@@ -50,14 +51,17 @@ public class WordCounter extends BaseBasicBolt {
* Just output the word value with a count of 1.
* The HBaseBolt will handle incrementing the counter.
*/
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
collector.emit(tuple(input.getValues().get(0), 1));
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
diff --git a/pom.xml b/pom.xml
index 91a3293d..ab1a7fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1254,6 +1254,32 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.12.0</version>
+ <configuration>
+ <rulesets>
+ <ruleset>storm/pmd-ruleset.xml</ruleset>
+ </rulesets>
+ <includeTests>true</includeTests>
+ <printFailingErrors>true</printFailingErrors>
+ <excludes>
+ <exclude>org/apache/storm/generated/**</exclude>
+ <exclude>org/apache/storm/sql/parser/impl/**</exclude>
+ </excludes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>pmd-check</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-checkstyle</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
diff --git a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
index 5c4cf64..589154a 100644
--- a/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
+++ b/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToJavaCodeCompiler.java
@@ -158,6 +158,7 @@ public class RexNodeToJavaCodeCompiler {
JavaRowFormat.ARRAY, false))));
final Function1<String, RexToLixTranslator.InputGetter> correlates =
new Function1<String, RexToLixTranslator.InputGetter>() {
+ @Override
public RexToLixTranslator.InputGetter apply(String a0) {
throw new UnsupportedOperationException();
}
diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
index b8743b9..38cb8e7 100644
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
+++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/spout/SocketSpout.java
@@ -157,6 +157,7 @@ public class SocketSpout implements IRichSpout {
}
private class SocketReaderRunnable implements Runnable {
+ @Override
public void run() {
while (running) {
try {
diff --git a/storm-checkstyle/src/main/resources/storm/pmd-ruleset.xml b/storm-checkstyle/src/main/resources/storm/pmd-ruleset.xml
new file mode 100644
index 0000000..83c0858
--- /dev/null
+++ b/storm-checkstyle/src/main/resources/storm/pmd-ruleset.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<ruleset name="Custom ruleset"
+ xmlns="http://pmd.sourceforge.net/ruleset/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://pmd.sourceforge.net/ruleset/2.0.0 http://pmd.sourceforge.net/ruleset_2_0_0.xsd">
+ <description>
+ The default ruleset for Apache Storm
+ </description>
+ <rule ref="category/java/bestpractices.xml/MissingOverride"/>
+</ruleset>
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/ILocalDRPC.java b/storm-client/src/jvm/org/apache/storm/ILocalDRPC.java
index 93d88b7..1a8916d 100644
--- a/storm-client/src/jvm/org/apache/storm/ILocalDRPC.java
+++ b/storm-client/src/jvm/org/apache/storm/ILocalDRPC.java
@@ -27,5 +27,6 @@ public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocati
* @deprecated use {@link #close()} instead
*/
@Deprecated
+ @Override
public void shutdown();
}
diff --git a/storm-client/src/jvm/org/apache/storm/LogWriter.java b/storm-client/src/jvm/org/apache/storm/LogWriter.java
index 4546f10..0c03219 100644
--- a/storm-client/src/jvm/org/apache/storm/LogWriter.java
+++ b/storm-client/src/jvm/org/apache/storm/LogWriter.java
@@ -56,6 +56,7 @@ public class LogWriter extends Thread {
System.exit(ret);
}
+ @Override
public void run() {
Logger logger = this.logger;
BufferedReader in = this.in;
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
index 5f90153..762842e 100644
--- a/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
+++ b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
@@ -116,5 +116,6 @@ public interface ILocalAssignmentsBackend extends AutoCloseable {
/**
* Function to release resource.
*/
+ @Override
void close();
}
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index f01c74e..b9290cf 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -171,6 +171,7 @@ public abstract class ClientBlobStore implements Shutdownable, AutoCloseable {
*/
public abstract void createStateInZookeeper(String key);
+ @Override
public abstract void close();
/**
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index 0b40f2f..181e4ec 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -74,6 +74,7 @@ public class CoordinatedBolt implements IRichBolt {
return ret;
}
+ @Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
if (_delegate instanceof TimeoutCallback) {
@@ -167,6 +168,7 @@ public class CoordinatedBolt implements IRichBolt {
return failed;
}
+ @Override
public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
@@ -201,11 +203,13 @@ public class CoordinatedBolt implements IRichBolt {
}
}
+ @Override
public void cleanup() {
_delegate.cleanup();
_tracked.cleanup();
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
@@ -306,17 +310,20 @@ public class CoordinatedBolt implements IRichBolt {
_delegate = delegate;
}
+ @Override
public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
updateTaskCounts(tuple.get(0), tasks);
return tasks;
}
+ @Override
public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
updateTaskCounts(tuple.get(0), Arrays.asList(task));
_delegate.emitDirect(task, stream, anchors, tuple);
}
+ @Override
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized (_tracked) {
@@ -333,6 +340,7 @@ public class CoordinatedBolt implements IRichBolt {
}
}
+ @Override
public void fail(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized (_tracked) {
@@ -345,14 +353,17 @@ public class CoordinatedBolt implements IRichBolt {
_delegate.fail(tuple);
}
+ @Override
public void flush() {
_delegate.flush();
}
+ @Override
public void resetTimeout(Tuple tuple) {
_delegate.resetTimeout(tuple);
}
+ @Override
public void reportError(Throwable error) {
_delegate.reportError(error);
}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 5cd04f7..79b5af6 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -73,6 +73,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param dir the directory to change permissions on
* @throws IOException on any error
*/
+ @Override
public void restrictDirectoryPermissions(File dir) throws IOException {
Set<PosixFilePermission> perms = new HashSet<>(
Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
@@ -88,6 +89,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param toDir where to move it from
* @throws IOException on any error
*/
+ @Override
public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
FileUtils.forceMkdir(toDir);
Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
@@ -96,6 +98,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
/**
* @return true if an atomic directory move works, else false.
*/
+ @Override
public boolean supportsAtomicDirectoryMove() {
return true;
}
@@ -107,6 +110,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param toDir to where
* @throws IOException on any error
*/
+ @Override
public void copyDirectory(File fromDir, File toDir) throws IOException {
FileUtils.copyDirectory(fromDir, toDir);
}
@@ -118,6 +122,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param user the user to change the permissions for
* @throws IOException on any error
*/
+ @Override
public void setupBlobPermissions(File path, String user) throws IOException {
//Normally this is a NOOP
}
@@ -130,6 +135,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param logPrefix if an external process needs to be launched to delete the object what prefix to include in the logs
* @throws IOException on any error.
*/
+ @Override
public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
//by default no need to do this as a different user
deleteIfExists(path);
@@ -141,6 +147,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param path what to delete
* @throws IOException on any error.
*/
+ @Override
public void deleteIfExists(File path) throws IOException {
LOG.info("Deleting path {}", path);
Path p = path.toPath();
@@ -159,6 +166,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param path the directory to set the permissions on
* @throws IOException on any error
*/
+ @Override
public void setupStormCodeDir(String user, File path) throws IOException {
//By default this is a NOOP
}
@@ -170,6 +178,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param path the directory to set the permissions on
* @throws IOException on any error
*/
+ @Override
public void setupWorkerArtifactsDir(String user, File path) throws IOException {
//By default this is a NOOP
}
@@ -183,6 +192,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
*
* @throws IOException on any error
*/
+ @Override
public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
return ClientSupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
}
@@ -193,6 +203,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param path the directory to create
* @throws IOException on any error
*/
+ @Override
public void forceMkdir(File path) throws IOException {
FileUtils.forceMkdir(path);
}
@@ -203,14 +214,17 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param path the directory to create
* @throws IOException on any error
*/
+ @Override
public void forceMkdir(Path path) throws IOException {
Files.createDirectories(path);
}
+ @Override
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
return Files.newDirectoryStream(dir, filter);
}
+ @Override
public DirectoryStream<Path> newDirectoryStream(Path dir) throws IOException {
return Files.newDirectoryStream(dir);
}
@@ -223,6 +237,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
*
* @throws IOException on any error.
*/
+ @Override
public boolean fileExists(File path) throws IOException {
return path.exists();
}
@@ -235,6 +250,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
*
* @throws IOException on any error.
*/
+ @Override
public boolean fileExists(Path path) throws IOException {
return Files.exists(path);
}
@@ -247,6 +263,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
*
* @throws IOException on any error
*/
+ @Override
public Writer getWriter(File file) throws IOException {
return new FileWriter(file);
}
@@ -259,6 +276,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
*
* @throws IOException on any error
*/
+ @Override
public OutputStream getOutputStream(File file) throws IOException {
return new FileOutputStream(file);
}
@@ -270,6 +288,7 @@ public class AdvancedFSOps implements IAdvancedFSOps {
* @param data the data to write
* @throws IOException on any error
*/
+ @Override
public void dump(File location, String data) throws IOException {
File parent = location.getParentFile();
if (!parent.exists()) {
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index 8ee8b63..0aa5734 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -139,6 +139,7 @@ public class ClientSupervisorUtils {
}
if (logPrefix != null || exitCodeCallback != null) {
Utils.asyncLoop(new Callable<Long>() {
+ @Override
public Long call() {
if (logPrefix != null) {
Utils.readAndLogStream(logPrefix,
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
index e30549b..e388de7 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
@@ -57,6 +57,7 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
return client.get() != null;
}
+ @Override
public void result(String id, String result) throws TException, AuthorizationException {
DistributedRPCInvocations.Client c = client.get();
try {
@@ -72,6 +73,7 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
}
}
+ @Override
public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
DistributedRPCInvocations.Client c = client.get();
try {
@@ -87,6 +89,7 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
}
}
+ @Override
public void failRequest(String id) throws TException, AuthorizationException {
DistributedRPCInvocations.Client c = client.get();
try {
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
index d6d4201..543e22a 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
@@ -39,10 +39,12 @@ public class JoinResult extends BaseRichBolt {
this.returnComponent = returnComponent;
}
+ @Override
public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
+ @Override
public void execute(Tuple tuple) {
Object requestId = tuple.getValue(0);
if (tuple.getSourceComponent().equals(returnComponent)) {
@@ -64,6 +66,7 @@ public class JoinResult extends BaseRichBolt {
}
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
index 1ca5647..fc10f86 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
@@ -39,7 +39,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
this(new BasicBoltExecutor(delegate));
}
-
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
if (_delegate instanceof FinishedCallback) {
_callback = (FinishedCallback) _delegate;
@@ -47,6 +47,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
_delegate.prepare(topoConf, context, collector);
_rrQueue = new KeyedRoundRobinQueue<Tuple>();
_executor = new Thread(new Runnable() {
+ @Override
public void run() {
try {
while (true) {
@@ -61,20 +62,24 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
_executor.start();
}
+ @Override
public void execute(Tuple input) {
Object key = input.getValue(0);
_rrQueue.add(key, input);
}
+ @Override
public void cleanup() {
_executor.interrupt();
_delegate.cleanup();
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
}
+ @Override
public void finishedId(Object id) {
if (_callback != null) {
_callback.finishedId(id);
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
index d9895ad..a0a7cfb 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
@@ -46,6 +46,7 @@ public class PrepareRequest extends BaseBasicBolt {
collector.emit(ID_STREAM, new Values(requestId));
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
index cb0aaa8..b120b51 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -126,6 +126,7 @@ public class ReturnResults extends BaseRichBolt {
}
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 37ea683..2a6d561 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -72,6 +72,7 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
}
}
+ @Override
public void loadChanged(LoadMapping loadMapping) {
executor.reflectNewLoadMapping(loadMapping);
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index b8a582b..5ea7838 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -59,6 +59,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
this.xsfer = executor.getExecutorTransfer();
}
+ @Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
try {
return boltEmit(streamId, anchors, tuple, null);
diff --git a/storm-client/src/jvm/org/apache/storm/generated/NumErrorsChoice.java b/storm-client/src/jvm/org/apache/storm/generated/NumErrorsChoice.java
index ace4920..b14da19 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/NumErrorsChoice.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/NumErrorsChoice.java
@@ -39,6 +39,7 @@ public enum NumErrorsChoice implements org.apache.storm.thrift.TEnum {
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
+ @Override
public int getValue() {
return value;
}
diff --git a/storm-client/src/jvm/org/apache/storm/generated/TopologyInitialStatus.java b/storm-client/src/jvm/org/apache/storm/generated/TopologyInitialStatus.java
index e55e7fa..7ccb6a0 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/TopologyInitialStatus.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/TopologyInitialStatus.java
@@ -38,6 +38,7 @@ public enum TopologyInitialStatus implements org.apache.storm.thrift.TEnum {
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
+ @Override
public int getValue() {
return value;
}
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/Load.java b/storm-client/src/jvm/org/apache/storm/grouping/Load.java
index db7f47e..2c6662d 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/Load.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/Load.java
@@ -64,6 +64,7 @@ public class Load {
return connectionLoad > boltLoad ? connectionLoad : boltLoad;
}
+ @Override
public String toString() {
return "[:load " + boltLoad + " " + connectionLoad + "]";
}
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
index d42c123..bba635e 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
@@ -156,6 +156,7 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
/**
* Creates a two task assignment by selecting random tasks.
*/
+ @Override
public int[] createAssignment(List<Integer> tasks, byte[] key) {
// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key
final long seedForRandom = Arrays.hashCode(key);
@@ -178,6 +179,7 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
/**
* Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
*/
+ @Override
public Integer chooseTask(int[] assignedTasks) {
Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 61a9c99..e76f9d5 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -177,6 +177,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// netty TimerTask is already defined and hence a fully
// qualified name
TIMER.schedule(new java.util.TimerTask() {
+ @Override
public void run() {
try {
LOG.debug("running timer task, address {}", dstAddress);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
index 7f2adb5..352c5b3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -107,6 +107,7 @@ public class KerberosSaslNettyClient {
final CallbackHandler fch = ch;
LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+ @Override
public SaslClient run() {
try {
Map<String, String> props = new TreeMap<String, String>();
@@ -146,6 +147,7 @@ public class KerberosSaslNettyClient {
try {
final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
byte[] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ @Override
public byte[] run() {
try {
byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
@@ -176,6 +178,7 @@ public class KerberosSaslNettyClient {
* @param callbacks objects that indicate what credential information the server's SaslServer requires from the client.
* @throws UnsupportedCallbackException
*/
+ @Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
LOG.info("Kerberos Client Callback Handler got callback: {}", callback.getClass());
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
index 3321e8f..466ee6d 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -97,6 +97,7 @@ class KerberosSaslNettyServer {
LOG.debug("Server with host: {}", fHost);
saslServer =
Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+ @Override
public SaslServer run() {
try {
Map<String, String> props = new TreeMap<String, String>();
@@ -136,6 +137,7 @@ class KerberosSaslNettyServer {
public byte[] response(final byte[] token) {
try {
byte[] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ @Override
public byte[] run() {
try {
LOG.debug("response: Responding to input token of length: {}",
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index 1530d37..f9e42f0 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -111,6 +111,7 @@ public class Login {
// you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running :
// "modprinc -maxlife 3mins <principal>" in kadmin.
t = new Thread(new Runnable() {
+ @Override
public void run() {
LOG.info("TGT refresh thread started.");
while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
index c55f2a9..11f487d 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClient.java
@@ -108,6 +108,7 @@ public class SaslNettyClient {
* @param callbacks objects that indicate what credential information the server's SaslServer requires from the client.
* @throws UnsupportedCallbackException
*/
+ @Override
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
NameCallback nc = null;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/AssignableMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/AssignableMetric.java
index f249427..8e980ad 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/AssignableMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/AssignableMetric.java
@@ -23,6 +23,7 @@ public class AssignableMetric implements IMetric {
_value = value;
}
+ @Override
public Object getValueAndReset() {
return _value;
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/CombinedMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/CombinedMetric.java
index bf50480..944533f 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/CombinedMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/CombinedMetric.java
@@ -25,6 +25,7 @@ public class CombinedMetric implements IMetric {
_value = _combiner.combine(_value, value);
}
+ @Override
public Object getValueAndReset() {
Object ret = _value;
_value = _combiner.identity();
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/CountMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/CountMetric.java
index 10d216a..8284ed9 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/CountMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/CountMetric.java
@@ -26,6 +26,7 @@ public class CountMetric implements IMetric {
_value += incrementBy;
}
+ @Override
public Object getValueAndReset() {
long ret = _value;
_value = 0;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MeanReducer.java b/storm-client/src/jvm/org/apache/storm/metric/api/MeanReducer.java
index 0af91f5..e78ddb8 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MeanReducer.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MeanReducer.java
@@ -24,10 +24,12 @@ class MeanReducerState {
}
public class MeanReducer implements IReducer<MeanReducerState> {
+ @Override
public MeanReducerState init() {
return new MeanReducerState();
}
+ @Override
public MeanReducerState reduce(MeanReducerState acc, Object input) {
acc.count++;
if (input instanceof Double) {
@@ -44,6 +46,7 @@ public class MeanReducer implements IReducer<MeanReducerState> {
return acc;
}
+ @Override
public Object extractResult(MeanReducerState acc) {
if (acc.count > 0) {
return acc.sum / (double) acc.count;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
index 65c9c32..cc2dce2 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
@@ -29,6 +29,7 @@ public class MultiCountMetric implements IMetric {
return val;
}
+ @Override
public Map<String, Object> getValueAndReset() {
Map<String, Object> ret = new HashMap<>();
for (Map.Entry<String, CountMetric> e : _value.entrySet()) {
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
index d5d807a..079b320 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
@@ -31,6 +31,7 @@ public class MultiReducedMetric implements IMetric {
return val;
}
+ @Override
public Map<String, Object> getValueAndReset() {
Map<String, Object> ret = new HashMap<>();
for (Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/ReducedMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/ReducedMetric.java
index 6b0041e..92ee6ff 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/ReducedMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/ReducedMetric.java
@@ -25,6 +25,7 @@ public class ReducedMetric implements IMetric {
_accumulator = _reducer.reduce(_accumulator, value);
}
+ @Override
public Object getValueAndReset() {
Object ret = _reducer.extractResult(_accumulator);
_accumulator = _reducer.init();
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/AssignableShellMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/AssignableShellMetric.java
index 7e59516..ec74f71 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/AssignableShellMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/AssignableShellMetric.java
@@ -19,6 +19,7 @@ public class AssignableShellMetric extends AssignableMetric implements IShellMet
super(value);
}
+ @Override
public void updateMetricFromRPC(Object value) {
setValue(value);
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CombinedShellMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CombinedShellMetric.java
index decf63a..bffd1e7 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CombinedShellMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CombinedShellMetric.java
@@ -20,6 +20,7 @@ public class CombinedShellMetric extends CombinedMetric implements IShellMetric
super(combiner);
}
+ @Override
public void updateMetricFromRPC(Object value) {
update(value);
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CountShellMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CountShellMetric.java
index 1924e94..abc2074 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/CountShellMetric.java
@@ -20,6 +20,7 @@ public class CountShellMetric extends CountMetric implements IShellMetric {
* if value is null, it will call incr()
* if value is long, it will call incrBy((long)params)
* */
+ @Override
public void updateMetricFromRPC(Object value) {
if (value == null) {
incr();
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/ReducedShellMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/ReducedShellMetric.java
index be67280..f7abddb 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/rpc/ReducedShellMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/rpc/ReducedShellMetric.java
@@ -21,6 +21,7 @@ public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
super(reducer);
}
+ @Override
public void updateMetricFromRPC(Object value) {
update(value);
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java
index abce348..d8112db 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java
@@ -187,6 +187,7 @@ public class CountStatAndMetric implements IMetric {
}
private class Fresher extends TimerTask {
+ @Override
public void run() {
rotateSched(System.currentTimeMillis());
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java
index e7d4636..c66f2a1 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java
@@ -245,6 +245,7 @@ public class LatencyStatAndMetric implements IMetric {
}
private class Fresher extends TimerTask {
+ @Override
public void run() {
rotateSched(System.currentTimeMillis());
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
index 30a8d02..d0bbc74 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/RateTracker.java
@@ -130,6 +130,7 @@ public class RateTracker implements Closeable {
}
private class Fresher extends TimerTask {
+ @Override
public void run() {
rotateBuckets(System.currentTimeMillis());
}
diff --git a/storm-client/src/jvm/org/apache/storm/multilang/JsonSerializer.java b/storm-client/src/jvm/org/apache/storm/multilang/JsonSerializer.java
index b012252..6b44d19 100644
--- a/storm-client/src/jvm/org/apache/storm/multilang/JsonSerializer.java
+++ b/storm-client/src/jvm/org/apache/storm/multilang/JsonSerializer.java
@@ -39,6 +39,7 @@ public class JsonSerializer implements ISerializer {
private transient BufferedWriter processIn;
private transient BufferedReader processOut;
+ @Override
public void initialize(OutputStream processIn, InputStream processOut) {
try {
this.processIn = new BufferedWriter(new OutputStreamWriter(processIn, DEFAULT_CHARSET));
@@ -48,6 +49,7 @@ public class JsonSerializer implements ISerializer {
}
}
+ @Override
public Number connect(Map<String, Object> conf, TopologyContext context)
throws IOException, NoOutputException {
JSONObject setupInfo = new JSONObject();
@@ -60,6 +62,7 @@ public class JsonSerializer implements ISerializer {
return pid;
}
+ @Override
public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
JSONObject obj = new JSONObject();
obj.put("id", boltMsg.getId());
@@ -70,6 +73,7 @@ public class JsonSerializer implements ISerializer {
writeMessage(obj);
}
+ @Override
public void writeSpoutMsg(SpoutMsg msg) throws IOException {
JSONObject obj = new JSONObject();
obj.put("command", msg.getCommand());
@@ -77,6 +81,7 @@ public class JsonSerializer implements ISerializer {
writeMessage(obj);
}
+ @Override
public void writeTaskIds(List<Integer> taskIds) throws IOException {
writeMessage(taskIds);
}
@@ -91,6 +96,7 @@ public class JsonSerializer implements ISerializer {
processIn.flush();
}
+ @Override
public ShellMsg readShellMsg() throws IOException, NoOutputException {
JSONObject msg = (JSONObject) readMessage();
ShellMsg shellMsg = new ShellMsg();
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index e01c7c0..641e326 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -145,10 +145,12 @@ public class PacemakerClient implements ISaslClient {
this.notifyAll();
}
+ @Override
public String name() {
return client_name;
}
+ @Override
public String secretKey() {
return secret;
}
@@ -231,6 +233,7 @@ public class PacemakerClient implements ISaslClient {
public void reconnect() {
final PacemakerClient client = this;
timer.schedule(new TimerTask() {
+ @Override
public void run() {
client.doReconnect();
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java b/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
index 480108d..73c4b3b 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/AutoSSL.java
@@ -83,6 +83,7 @@ public class AutoSSL implements IAutoCredentials {
}
}
+ @Override
public void prepare(Map<String, Object> conf) {
this.conf = conf;
writeDir = getSSLWriteDirFromConf(this.conf);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/DefaultPrincipalToLocal.java b/storm-client/src/jvm/org/apache/storm/security/auth/DefaultPrincipalToLocal.java
index 37e3a6a..3e95613 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/DefaultPrincipalToLocal.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/DefaultPrincipalToLocal.java
@@ -21,6 +21,7 @@ public class DefaultPrincipalToLocal implements IPrincipalToLocal {
/**
* Invoked once immediately after construction
*/
+ @Override
public void prepare(Map<String, Object> topoConf) {
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/KerberosPrincipalToLocal.java b/storm-client/src/jvm/org/apache/storm/security/auth/KerberosPrincipalToLocal.java
index b46c860..ea341f0 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/KerberosPrincipalToLocal.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/KerberosPrincipalToLocal.java
@@ -24,6 +24,7 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal {
*
* @param topoConf Storm configuration
*/
+ @Override
public void prepare(Map<String, Object> topoConf) {
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 059ebda..46617e0 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -127,6 +127,7 @@ public class SimpleTransportPlugin implements ITransportPlugin {
this.wrapped = wrapped;
}
+ @Override
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
//populating request context
ReqContext req_context = ReqContext.context();
@@ -152,10 +153,12 @@ public class SimpleTransportPlugin implements ITransportPlugin {
if (user != null) {
HashSet<Principal> principals = new HashSet<>();
principals.add(new Principal() {
+ @Override
public String getName() {
return user;
}
+ @Override
public String toString() {
return user;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DenyAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DenyAuthorizer.java
index de16e5b..e51a7b0 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DenyAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -26,6 +26,7 @@ public class DenyAuthorizer implements IAuthorizer {
*
* @param conf Storm configuration
*/
+ @Override
public void prepare(Map<String, Object> conf) {
}
@@ -37,6 +38,7 @@ public class DenyAuthorizer implements IAuthorizer {
* @param topoConf configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
+ @Override
public boolean permit(ReqContext context, String operation, Map<String, Object> topoConf) {
return false;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/NoopAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/NoopAuthorizer.java
index a0d95da..c984f3a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/NoopAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -26,6 +26,7 @@ public class NoopAuthorizer implements IAuthorizer {
*
* @param conf Storm configuration
*/
+ @Override
public void prepare(Map<String, Object> conf) {
}
@@ -37,6 +38,7 @@ public class NoopAuthorizer implements IAuthorizer {
* @param topoConf configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
+ @Override
public boolean permit(ReqContext context, String operation, Map<String, Object> topoConf) {
return true;
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 463b841..e3e4497 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -36,6 +36,7 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
private WorkerTokenAuthorizer workerTokenAuthorizer;
+ @Override
protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
if (workerTokenAuthorizer == null) {
workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
index 7dd8b7c..e7b6eac 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
@@ -33,6 +33,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
// initial state
private Subject subject;
+ @Override
public void initialize(Subject subject,
CallbackHandler callbackHandler,
Map<String, ?> sharedState,
@@ -41,6 +42,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
this.subject = subject;
}
+ @Override
public boolean login() throws LoginException {
LOG.debug("Acquire TGT from Cache");
getKerbTicketFromCache();
@@ -62,6 +64,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
return null;
}
+ @Override
public boolean commit() throws LoginException {
if (isSucceeded() == false) {
return false;
@@ -80,6 +83,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
return true;
}
+ @Override
public boolean abort() throws LoginException {
if (isSucceeded() == false) {
return false;
@@ -88,6 +92,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
}
}
+ @Override
public boolean logout() throws LoginException {
if (subject != null && !subject.isReadOnly() && kerbTicket != null) {
subject.getPrincipals().remove(kerbTicket.getClient());
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
index a426840..bf06d3c 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -57,6 +57,7 @@ public class ClientCallbackHandler implements CallbackHandler {
*
* @param callbacks a collection of challenge callbacks
*/
+ @Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback c : callbacks) {
if (c instanceof NameCallback) {
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 27ea878..7cf109b 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -205,6 +205,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
try {
Subject.doAs(subject,
new PrivilegedExceptionAction<Void>() {
+ @Override
public Void run() {
try {
LOG.debug("do as:" + principal);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 139ad54..20595a9 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -50,6 +50,7 @@ public class ServerCallbackHandler implements CallbackHandler {
}
}
+ @Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index 1694caa..f7cf596 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -124,6 +124,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable
this.wrapped = wrapped;
}
+ @Override
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
//populating request context
ReqContext reqContext = ReqContext.context();
@@ -163,6 +164,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable
/**
* Get the full name of the user.
*/
+ @Override
public String getName() {
return name;
}
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java
index 7ce0708..336fa45 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java
@@ -32,6 +32,7 @@ public class DefaultKryoFactory implements IKryoFactory {
public void preRegister(Kryo k, Map<String, Object> conf) {
}
+ @Override
public void postRegister(Kryo k, Map<String, Object> conf) {
((KryoSerializableDefault) k).overrideDefault(true);
}
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java
index 39a601f..ad89438 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java
@@ -29,6 +29,7 @@ public class KryoTupleSerializer implements ITupleSerializer {
_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
}
+ @Override
public byte[] serialize(Tuple tuple) {
try {
diff --git a/storm-client/src/jvm/org/apache/storm/spout/RawScheme.java b/storm-client/src/jvm/org/apache/storm/spout/RawScheme.java
index 5b7df4e..732dbd4 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/RawScheme.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/RawScheme.java
@@ -24,12 +24,14 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
public class RawScheme implements Scheme {
+ @Override
public List<Object> deserialize(ByteBuffer ser) {
// Maintain backward compatibility for 0.10
byte[] b = Utils.toByteArray(ser);
return Utils.tuple(new Object[]{ b });
}
+ @Override
public Fields getOutputFields() {
return new Fields("bytes");
}
diff --git a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
index 6409e5c..317aec1 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
@@ -87,6 +87,7 @@ public class ShellSpout implements ISpout {
this.changeDirectory = changeDirectory;
}
+ @Override
public void open(Map<String, Object> topoConf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
@@ -112,20 +113,24 @@ public class ShellSpout implements ISpout {
heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
+ @Override
public void close() {
heartBeatExecutorService.shutdownNow();
_process.destroy();
_running = false;
}
+ @Override
public void nextTuple() {
this.sendSyncCommand("next", "");
}
+ @Override
public void ack(Object msgId) {
this.sendSyncCommand("ack", msgId);
}
+ @Override
public void fail(Object msgId) {
this.sendSyncCommand("fail", msgId);
}
diff --git a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
index 72b1ade..f8895e6 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java
@@ -38,6 +38,7 @@ public class SpoutOutputCollector implements ISpoutOutputCollector {
*
* @return the list of task ids that this tuple was sent to
*/
+ @Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return _delegate.emit(streamId, tuple, messageId);
}
@@ -77,6 +78,7 @@ public class SpoutOutputCollector implements ISpoutOutputCollector {
* functionality will only work if the messageId is serializable via Kryo or the Serializable interface. The emitted values must be
* immutable.
*/
+ @Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
_delegate.emitDirect(taskId, streamId, tuple, messageId);
}
diff --git a/storm-client/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java b/storm-client/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java
index b122635..ee32e6a 100644
--- a/storm-client/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java
+++ b/storm-client/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java
@@ -40,6 +40,7 @@ public abstract class BaseBinaryStateIterator<K, V> extends BaseStateIterator<K,
*
* @return Iterator of loaded state KVs
*/
+ @Override
protected abstract Iterator<Map.Entry<byte[], byte[]>> loadChunkFromStateStorage();
/**
@@ -47,6 +48,7 @@ public abstract class BaseBinaryStateIterator<K, V> extends BaseStateIterator<K,
*
* @return whether end of data is reached from storage state KVs
*/
+ @Override
protected abstract boolean isEndOfDataFromStorage();
/**
@@ -55,6 +57,7 @@ public abstract class BaseBinaryStateIterator<K, V> extends BaseStateIterator<K,
* @param key byte array encoded key
* @return Decoded value of key
*/
+ @Override
protected abstract K decodeKey(byte[] key);
/**
@@ -63,6 +66,7 @@ public abstract class BaseBinaryStateIterator<K, V> extends BaseStateIterator<K,
* @param value byte array encoded value
* @return Decoded value of value
*/
+ @Override
protected abstract V decodeValue(byte[] value);
/**
@@ -71,6 +75,7 @@ public abstract class BaseBinaryStateIterator<K, V> extends BaseStateIterator<K,
* @param value the value to check
* @return true if the value is tombstone, false otherwise
*/
+ @Override
protected abstract boolean isTombstoneValue(byte[] value);
}
diff --git a/storm-client/src/jvm/org/apache/storm/state/DefaultStateEncoder.java b/storm-client/src/jvm/org/apache/storm/state/DefaultStateEncoder.java
index b84d54b..42a9a4d 100644
--- a/storm-client/src/jvm/org/apache/storm/state/DefaultStateEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/state/DefaultStateEncoder.java
@@ -41,19 +41,23 @@ public class DefaultStateEncoder<K, V> implements StateEncoder<K, V, byte[], byt
return valueSerializer;
}
+ @Override
public byte[] encodeKey(K key) {
return keySerializer.serialize(key);
}
+ @Override
public byte[] encodeValue(V value) {
return internalValueSerializer.serialize(
Optional.of(valueSerializer.serialize(value)));
}
+ @Override
public K decodeKey(byte[] encodedKey) {
return keySerializer.deserialize(encodedKey);
}
+ @Override
public V decodeValue(byte[] encodedValue) {
Optional<byte[]> internalValue = internalValueSerializer.deserialize(encodedValue);
if (internalValue.isPresent()) {
diff --git a/storm-client/src/jvm/org/apache/storm/streams/operations/Reducer.java b/storm-client/src/jvm/org/apache/storm/streams/operations/Reducer.java
index e4d12cb..8f2f4bb 100644
--- a/storm-client/src/jvm/org/apache/storm/streams/operations/Reducer.java
+++ b/storm-client/src/jvm/org/apache/storm/streams/operations/Reducer.java
@@ -25,5 +25,6 @@ public interface Reducer<T> extends BiFunction<T, T, T> {
* @param arg2 the second argument
* @return the result
*/
+ @Override
T apply(T arg1, T arg2);
}
diff --git a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
index 6b2a11f..719b1a8 100644
--- a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
@@ -122,6 +122,7 @@ public class ShellBolt implements IBolt {
this.changeDirectory = changeDirectory;
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
final OutputCollector collector) {
if (ConfigUtils.isLocalMode(topoConf)) {
@@ -169,6 +170,7 @@ public class ShellBolt implements IBolt {
heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
}
+ @Override
public void execute(Tuple input) {
if (_exception != null) {
throw new RuntimeException(_exception);
@@ -197,6 +199,7 @@ public class ShellBolt implements IBolt {
return boltMsg;
}
+ @Override
public void cleanup() {
_running = false;
heartBeatExecutorService.shutdownNow();
@@ -323,6 +326,7 @@ public class ShellBolt implements IBolt {
}
private class BoltReaderRunnable implements Runnable {
+ @Override
public void run() {
while (_running) {
try {
@@ -367,6 +371,7 @@ public class ShellBolt implements IBolt {
}
private class BoltWriterRunnable implements Runnable {
+ @Override
public void run() {
while (_running) {
try {
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index d6461d4..c6a6371 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -316,6 +316,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
* @return The IMetric argument unchanged.
*/
@Deprecated
+ @Override
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
if (_openOrPrepareWasCalled.get()) {
throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
@@ -382,6 +383,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
* Convenience method for registering ReducedMetric.
*/
@Deprecated
+ @Override
public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
@@ -390,6 +392,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
* Convenience method for registering CombinedMetric.
*/
@Deprecated
+ @Override
public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/BoltTracker.java b/storm-client/src/jvm/org/apache/storm/testing/BoltTracker.java
index 0bb3bf3..5a57536 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/BoltTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/BoltTracker.java
@@ -26,6 +26,7 @@ public class BoltTracker extends NonRichBoltTracker implements IRichBolt {
_richDelegate = delegate;
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_richDelegate.declareOutputFields(declarer);
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-client/src/jvm/org/apache/storm/testing/FeederSpout.java
index 7f0239ca..5cce40c 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -60,14 +60,17 @@ public class FeederSpout extends BaseRichSpout {
InprocMessaging.waitForReader(_id);
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
if (toEmit != null) {
@@ -78,18 +81,21 @@ public class FeederSpout extends BaseRichSpout {
}
}
+ @Override
public void ack(Object msgId) {
if (_ackFailDelegate != null) {
_ackFailDelegate.ack(msgId);
}
}
+ @Override
public void fail(Object msgId) {
if (_ackFailDelegate != null) {
_ackFailDelegate.fail(msgId);
}
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_outFields);
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
index 9665ae3..8404f86 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
@@ -104,6 +104,7 @@ public class FixedTupleSpout implements IRichSpout, CompletableSpout {
}
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_context = context;
List<Integer> tasks = context.getComponentTasks(context.getThisComponentId());
@@ -121,9 +122,11 @@ public class FixedTupleSpout implements IRichSpout, CompletableSpout {
}
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
if (_serveTuples.size() > 0) {
FixedTuple ft = _serveTuples.remove(0);
@@ -133,6 +136,7 @@ public class FixedTupleSpout implements IRichSpout, CompletableSpout {
}
}
+ @Override
public void ack(Object msgId) {
synchronized (acked) {
int curr = get(acked, _id, 0);
@@ -140,6 +144,7 @@ public class FixedTupleSpout implements IRichSpout, CompletableSpout {
}
}
+ @Override
public void fail(Object msgId) {
synchronized (failed) {
int curr = get(failed, _id, 0);
diff --git a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
index 84bb322..f5d0327 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
@@ -30,16 +30,19 @@ public class NonRichBoltTracker implements IBolt {
_trackId = id;
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_delegate.prepare(topoConf, context, collector);
}
+ @Override
public void execute(Tuple input) {
_delegate.execute(input);
Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
+ @Override
public void cleanup() {
_delegate.cleanup();
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java b/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
index 76be626..48e08f1 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsBolt.java
@@ -37,6 +37,7 @@ public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
super(command, file);
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
super.prepare(topoConf, context, collector);
@@ -44,9 +45,11 @@ public class PythonShellMetricsBolt extends ShellBolt implements IRichBolt {
context.registerMetric("my-custom-shell-metric", cMetric, 5);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
+ @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java b/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
index 9d2c581..e0795cb 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/PythonShellMetricsSpout.java
@@ -46,10 +46,12 @@ public class PythonShellMetricsSpout extends ShellSpout implements IRichSpout {
context.registerMetric("my-custom-shellspout-metric", cMetric, 5);
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("field1"));
}
+ @Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/SingleUserSimpleTransport.java b/storm-client/src/jvm/org/apache/storm/testing/SingleUserSimpleTransport.java
index 762f0f1..d385de5 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/SingleUserSimpleTransport.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/SingleUserSimpleTransport.java
@@ -23,10 +23,12 @@ public class SingleUserSimpleTransport extends SimpleTransportPlugin {
protected Subject getDefaultSubject() {
HashSet<Principal> principals = new HashSet<Principal>();
principals.add(new Principal() {
+ @Override
public String getName() {
return "user";
}
+ @Override
public String toString() {
return "user";
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
index 59833f2..88a48b6 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
@@ -35,31 +35,37 @@ public class SpoutTracker extends BaseRichSpout {
_trackId = trackId;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_tracker = new SpoutTrackOutputCollector(collector);
_delegate.open(conf, context, new SpoutOutputCollector(_tracker));
}
+ @Override
public void close() {
_delegate.close();
}
+ @Override
public void nextTuple() {
_delegate.nextTuple();
}
+ @Override
public void ack(Object msgId) {
_delegate.ack(msgId);
Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
+ @Override
public void fail(Object msgId) {
_delegate.fail(msgId);
Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
}
@@ -79,12 +85,14 @@ public class SpoutTracker extends BaseRichSpout {
}
+ @Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
List<Integer> ret = _collector.emit(streamId, tuple, messageId);
recordSpoutEmit();
return ret;
}
+ @Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
_collector.emitDirect(taskId, streamId, tuple, messageId);
recordSpoutEmit();
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestAggregatesCounter.java b/storm-client/src/jvm/org/apache/storm/testing/TestAggregatesCounter.java
index 94cde43..8fff86d 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestAggregatesCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestAggregatesCounter.java
@@ -32,11 +32,13 @@ public class TestAggregatesCounter extends BaseRichBolt {
Map<String, Integer> _counts;
OutputCollector _collector;
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
_counts = new HashMap<String, Integer>();
}
+ @Override
public void execute(Tuple input) {
String word = (String) input.getValues().get(0);
int count = (Integer) input.getValues().get(1);
@@ -49,10 +51,12 @@ public class TestAggregatesCounter extends BaseRichBolt {
_collector.ack(input);
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("agg-global"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestEventLogSpout.java b/storm-client/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
index f079221..9418fbb 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestEventLogSpout.java
@@ -62,6 +62,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
}
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.source = context.getThisTaskId();
@@ -69,6 +70,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
myCount = totalCount / taskCount;
}
+ @Override
public void close() {
}
@@ -101,6 +103,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
return false;
}
+ @Override
public void nextTuple() {
if (eventId < myCount) {
eventId++;
@@ -108,6 +111,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
}
}
+ @Override
public void ack(Object msgId) {
synchronized (acked) {
int curr = get(acked, uid, 0);
@@ -115,6 +119,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
}
}
+ @Override
public void fail(Object msgId) {
synchronized (failed) {
int curr = get(failed, uid, 0);
@@ -122,6 +127,7 @@ public class TestEventLogSpout extends BaseRichSpout implements CompletableSpout
}
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source", "eventId"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestEventOrderCheckBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TestEventOrderCheckBolt.java
index debd4d4..320620f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestEventOrderCheckBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestEventOrderCheckBolt.java
@@ -30,11 +30,13 @@ public class TestEventOrderCheckBolt extends BaseRichBolt {
Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
private int _count;
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
_count = 0;
}
+ @Override
public void execute(Tuple input) {
Integer sourceId = input.getInteger(0);
Long eventId = input.getLong(1);
@@ -51,6 +53,7 @@ public class TestEventOrderCheckBolt extends BaseRichBolt {
_collector.ack(input);
}
+ @Override
public void cleanup() {
}
@@ -59,6 +62,7 @@ public class TestEventOrderCheckBolt extends BaseRichBolt {
return new Fields("error");
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("error"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestGlobalCount.java b/storm-client/src/jvm/org/apache/storm/testing/TestGlobalCount.java
index b429298..245b82c 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestGlobalCount.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestGlobalCount.java
@@ -29,17 +29,20 @@ public class TestGlobalCount extends BaseRichBolt {
OutputCollector _collector;
private int _count;
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
_count = 0;
}
+ @Override
public void execute(Tuple input) {
_count++;
_collector.emit(input, new Values(_count));
_collector.ack(input);
}
+ @Override
public void cleanup() {
}
@@ -48,6 +51,7 @@ public class TestGlobalCount extends BaseRichBolt {
return new Fields("global-count");
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("global-count"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestKryoDecorator.java b/storm-client/src/jvm/org/apache/storm/testing/TestKryoDecorator.java
index f2f03e9..7af7eba 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestKryoDecorator.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestKryoDecorator.java
@@ -17,6 +17,7 @@ import org.apache.storm.serialization.IKryoDecorator;
public class TestKryoDecorator implements IKryoDecorator {
+ @Override
public void decorate(Kryo k) {
k.register(TestSerObject.class);
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestPlannerBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TestPlannerBolt.java
index 3051970..d99d405 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestPlannerBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestPlannerBolt.java
@@ -22,10 +22,12 @@ import org.apache.storm.tuple.Tuple;
public class TestPlannerBolt extends BaseRichBolt {
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
}
+ @Override
public void execute(Tuple input) {
}
@@ -34,6 +36,7 @@ public class TestPlannerBolt extends BaseRichBolt {
return new Fields("field1", "field2");
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(getOutputFields());
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestPlannerSpout.java b/storm-client/src/jvm/org/apache/storm/testing/TestPlannerSpout.java
index 8f0368a..999f068 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestPlannerSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestPlannerSpout.java
@@ -45,26 +45,32 @@ public class TestPlannerSpout extends BaseRichSpout {
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
Utils.sleep(100);
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(getOutputFields());
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestWordCounter.java b/storm-client/src/jvm/org/apache/storm/testing/TestWordCounter.java
index 4244ad3..e10e066 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestWordCounter.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestWordCounter.java
@@ -31,6 +31,7 @@ public class TestWordCounter extends BaseBasicBolt {
Map<String, Integer> _counts;
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
_counts = new HashMap<String, Integer>();
}
@@ -39,6 +40,7 @@ public class TestWordCounter extends BaseBasicBolt {
return (String) t.getValues().get(idx);
}
+ @Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = getTupleValue(input, 0);
int count = 0;
@@ -50,10 +52,12 @@ public class TestWordCounter extends BaseBasicBolt {
collector.emit(tuple(word, count));
}
+ @Override
public void cleanup() {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TestWordSpout.java b/storm-client/src/jvm/org/apache/storm/testing/TestWordSpout.java
index 02613de..2c86c68 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TestWordSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TestWordSpout.java
@@ -40,14 +40,17 @@ public class TestWordSpout extends BaseRichSpout {
_isDistributed = isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[]{ "nathan", "mike", "jackson", "golda", "bertels" };
@@ -56,14 +59,17 @@ public class TestWordSpout extends BaseRichSpout {
_collector.emit(new Values(word));
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
index 1829f5e..13b5b4e 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
@@ -35,10 +35,12 @@ public class TupleCaptureBolt implements IRichBolt {
emitted_tuples.put(_name, new HashMap<String, List<FixedTuple>>());
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
+ @Override
public void execute(Tuple input) {
String component = input.getSourceComponent();
Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
@@ -53,6 +55,7 @@ public class TupleCaptureBolt implements IRichBolt {
return emitted_tuples.get(_name);
}
+ @Override
public void cleanup() {
}
diff --git a/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java
index b423d08..a7a3a9e 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java
@@ -29,16 +29,19 @@ public class BasicBoltExecutor implements IRichBolt {
_bolt = bolt;
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_bolt.declareOutputFields(declarer);
}
+ @Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_bolt.prepare(topoConf, context);
_collector = new BasicOutputCollector(collector);
}
+ @Override
public void execute(Tuple input) {
_collector.setContext(input);
try {
@@ -52,10 +55,12 @@ public class BasicBoltExecutor implements IRichBolt {
}
}
+ @Override
public void cleanup() {
_bolt.cleanup();
}
+ @Override
public Map<String, Object> getComponentConfiguration() {
return _bolt.getComponentConfiguration();
}
diff --git a/storm-client/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-client/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 2f5ba2d..8ff4999 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -27,6 +27,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
this.out = out;
}
+ @Override
public List<Integer> emit(String streamId, List<Object> tuple) {
return out.emit(streamId, inputTuple, tuple);
}
@@ -39,6 +40,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
this.inputTuple = inputTuple;
}
+ @Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
out.emitDirect(taskId, streamId, inputTuple, tuple);
}
@@ -53,6 +55,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
*
* @param tuple the tuple to reset timeout for
*/
+ @Override
public void resetTimeout(Tuple tuple) {
out.resetTimeout(tuple);
}
@@ -61,6 +64,7 @@ public class BasicOutputCollector implements IBasicOutputCollector {
return out;
}
+ @Override
public void reportError(Throwable t) {
out.reportError(t);
}
diff --git a/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index ad56d50..9ab5f83 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -67,6 +67,7 @@ public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
* @param action the action (prepare, commit, rollback or initstate)
* @param txid the transaction id.
*/
+ @Override
protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
collector.ack(checkpointTuple);
@@ -82,6 +83,7 @@ public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
*
* @param input the input tuple
*/
+ @Override
protected void handleTuple(Tuple input) {
bolt.execute(input);
}
diff --git a/storm-client/src/jvm/org/apache/storm/topology/OutputFieldsGetter.java b/storm-client/src/jvm/org/apache/storm/topology/OutputFieldsGetter.java
index 8849175..1cd268c 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/OutputFieldsGetter.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/OutputFieldsGetter.java
@@ -21,18 +21,22 @@ import org.apache.storm.utils.Utils;
public class OutputFieldsGetter implements OutputFieldsDeclarer {
private Map<String, StreamInfo> _fields = new HashMap<>();
+ @Override
public void declare(Fields fields) {
declare(false, fields);
}
+ @Override
public void declare(boolean direct, Fields fields) {
declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
}
+ @Override
public void declareStream(String streamId, Fields fields) {
declareStream(streamId, false, fields);
}
+ @Override
public void declareStream(String streamId, boolean direct, Fields fields) {
if (null == streamId) {
throw new IllegalArgumentException("streamId can't be null");
diff --git a/storm-client/src/jvm/org/apache/storm/trident/fluent/ChainedAggregatorDeclarer.java b/storm-client/src/jvm/org/apache/storm/trident/fluent/ChainedAggregatorDeclarer.java
index 62c88a0..51d8ab7 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/fluent/ChainedAggregatorDeclarer.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/fluent/ChainedAggregatorDeclarer.java
@@ -41,6 +41,7 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
_globalScheme = globalScheme;
}
+ @Override
public Stream chainEnd() {
Fields[] inputFields = new Fields[_aggs.size()];
Aggregator[] aggs = new Aggregator[_aggs.size()];
@@ -89,37 +90,45 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
return _stream.toStream();
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
_type = AggType.PARTITION;
_aggs.add(new AggSpec(inputFields, agg, functionFields));
return this;
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(CombinerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
initCombiner(inputFields, agg, functionFields);
return partitionAggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields);
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
+ @Override
public ChainedPartitionAggregatorDeclarer partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
return aggregate(inputFields, agg, functionFields, false);
}
@@ -136,19 +145,23 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
return this;
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(CombinerAggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
initCombiner(inputFields, agg, functionFields);
return aggregate(functionFields, new CombinerAggregatorCombineImpl(agg), functionFields, true);
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(ReducerAggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
+ @Override
public ChainedFullAggregatorDeclarer aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return aggregate(inputFields, new ReducerAggregatorImpl(agg), functionFields);
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
index e08d4f3..473eda6 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentOperationContext.java
@@ -53,14 +53,17 @@ public class TridentOperationContext implements IMetricsContext {
return _topoContext.getThisTaskIndex();
}
+ @Override
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
}
+ @Override
public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
+ @Override
public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java
index b2a7e29..6a2ce19 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java
@@ -39,6 +39,7 @@ public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
}
}
+ @Override
public void prepare(Map<String, Object> conf, TridentOperationContext context) {
_inputFactories = new ProjectionFactory[_inputFields.length];
for (int i = 0; i < _inputFields.length; i++) {
@@ -47,6 +48,7 @@ public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
}
}
+ @Override
public ChainedResult init(Object batchId, TridentCollector collector) {
ChainedResult initted = new ChainedResult(collector, _aggs.length);
for (int i = 0; i < _aggs.length; i++) {
@@ -55,6 +57,7 @@ public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
return initted;
}
+ @Override
public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
val.setFollowThroughCollector(collector);
for (int i = 0; i < _aggs.length; i++) {
@@ -63,6 +66,7 @@ public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
}
}
+ @Override
public void complete(ChainedResult val, TridentCollector collector) {
val.setFollowThroughCollector(collector);
for (int i = 0; i < _aggs.length; i++) {
@@ -101,6 +105,7 @@ public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
return true;
}
+ @Override
public void cleanup() {
for (Aggregator a : _aggs) {
a.cleanup();
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
index 5813f32..7525ea5 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CombinerAggregatorCombineImpl.java
@@ -27,16 +27,19 @@ public class CombinerAggregatorCombineImpl implements Aggregator<Result> {
_agg = agg;
}
+ @Override
public void prepare(Map<String, Object> conf, TridentOperationContext context) {
}
+ @Override
public Result init(Object batchId, TridentCollector collector) {
Result ret = new Result();
ret.obj = _agg.zero();
return ret;
}
+ @Override
public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
Object v = tuple.getValue(0);
if (val.obj == null) {
@@ -46,10 +49,12 @@ public class CombinerAggregatorCombineImpl implements Aggregator<Result> {
}
}
+ @Override
public void complete(Result val, TridentCollector collector) {
collector.emit(new Values(val.obj));
}
+ @Override
public void cleanup() {
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java
index 5ea0482..0b6a507 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java
@@ -27,24 +27,29 @@ public class ReducerAggregatorImpl implements Aggregator<Result> {
_agg = agg;
}
+ @Override
public void prepare(Map<String, Object> conf, TridentOperationContext context) {
}
+ @Override
public Result init(Object batchId, TridentCollector collector) {
Result ret = new Result();
ret.obj = _agg.init();
return ret;
}
+ @Override
public void aggregate(Result val, TridentTuple tuple, TridentCollector collector) {
val.obj = _agg.reduce(val.obj, tuple);
}
+ @Override
public void complete(Result val, TridentCollector collector) {
collector.emit(new Values(val.obj));
}
+ @Override
public void cleanup() {
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/testing/FeederBatchSpout.java b/storm-client/src/jvm/org/apache/storm/trident/testing/FeederBatchSpout.java
index e021e7b..236ef48 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/testing/FeederBatchSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/testing/FeederBatchSpout.java
@@ -45,6 +45,7 @@ public class FeederBatchSpout implements ITridentSpout<Map<Integer, List<List<Ob
_waitToEmit = trueIfWait;
}
+ @Override
public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List) RegisteredGlobalState.getState(_semaphoreId)).add(sem);
diff --git a/storm-client/src/jvm/org/apache/storm/trident/testing/LRUMemoryMapState.java b/storm-client/src/jvm/org/apache/storm/trident/testing/LRUMemoryMapState.java
index 104ebaf..2b12a72 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/testing/LRUMemoryMapState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/testing/LRUMemoryMapState.java
@@ -43,38 +43,47 @@ public class LRUMemoryMapState<T> implements Snapshottable<T>, ITupleCollection,
_delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
}
+ @Override
public T update(ValueUpdater updater) {
return _delegate.update(updater);
}
+ @Override
public void set(T o) {
_delegate.set(o);
}
+ @Override
public T get() {
return _delegate.get();
}
+ @Override
public void beginCommit(Long txid) {
_delegate.beginCommit(txid);
}
+ @Override
public void commit(Long txid) {
_delegate.commit(txid);
}
+ @Override
public Iterator<List<Object>> getTuples() {
return _backing.getTuples();
}
+ @Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
return _delegate.multiUpdate(keys, updaters);
}
+ @Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
_delegate.multiPut(keys, vals);
}
+ @Override
public List<T> multiGet(List<List<Object>> keys) {
return _delegate.multiGet(keys);
}
@@ -135,10 +144,12 @@ public class LRUMemoryMapState<T> implements Snapshottable<T>, ITupleCollection,
private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
+ @Override
public boolean hasNext() {
return it.hasNext();
}
+ @Override
public List<Object> next() {
Map.Entry<List<Object>, T> e = it.next();
List<Object> ret = new ArrayList<Object>();
@@ -147,6 +158,7 @@ public class LRUMemoryMapState<T> implements Snapshottable<T>, ITupleCollection,
return ret;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java
index cd6f099..b3aed7b 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java
@@ -46,18 +46,22 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
_delegate = new SnapshottableMap(OpaqueMap.build(_backing), new Values("$MEMORY-MAP-STATE-GLOBAL$"));
}
+ @Override
public T update(ValueUpdater updater) {
return _delegate.update(updater);
}
+ @Override
public void set(T o) {
_delegate.set(o);
}
+ @Override
public T get() {
return _delegate.get();
}
+ @Override
public void beginCommit(Long txid) {
_delegate.beginCommit(txid);
if (txid == null || !txid.equals(_currTx)) {
@@ -67,22 +71,27 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
_currTx = txid;
}
+ @Override
public void commit(Long txid) {
_delegate.commit(txid);
}
+ @Override
public Iterator<List<Object>> getTuples() {
return _backing.getTuples();
}
+ @Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
return _delegate.multiUpdate(keys, updaters);
}
+ @Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
_delegate.multiPut(keys, vals);
}
+ @Override
public List<T> multiGet(List<List<Object>> keys) {
return _delegate.multiGet(keys);
}
@@ -159,10 +168,12 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
private Iterator<Map.Entry<List<Object>, T>> it = db.entrySet().iterator();
+ @Override
public boolean hasNext() {
return it.hasNext();
}
+ @Override
public List<Object> next() {
Map.Entry<List<Object>, T> e = it.next();
List<Object> ret = new ArrayList<Object>();
@@ -171,6 +182,7 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
return ret;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/TransactionAttempt.java b/storm-client/src/jvm/org/apache/storm/trident/topology/TransactionAttempt.java
index 0d30a68..1f90b38 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/TransactionAttempt.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/TransactionAttempt.java
@@ -34,10 +34,12 @@ public class TransactionAttempt implements IBatchID {
return _txid;
}
+ @Override
public Object getId() {
return _txid;
}
+ @Override
public int getAttemptId() {
return _attemptId;
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
index 036bf7f..e3500b2 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@@ -400,25 +400,30 @@ public class TridentBoltExecutor implements IRichBolt {
_currBatch = batch;
}
+ @Override
public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
updateTaskCounts(tasks);
return tasks;
}
+ @Override
public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
updateTaskCounts(Arrays.asList(task));
_delegate.emitDirect(task, stream, anchors, tuple);
}
+ @Override
public void ack(Tuple tuple) {
throw new IllegalStateException("Method should never be called");
}
+ @Override
public void fail(Tuple tuple) {
throw new IllegalStateException("Method should never be called");
}
+ @Override
public void resetTimeout(Tuple tuple) {
throw new IllegalStateException("Method should never be called");
}
@@ -428,6 +433,7 @@ public class TridentBoltExecutor implements IRichBolt {
_delegate.flush();
}
+ @Override
public void reportError(Throwable error) {
_delegate.reportError(error);
}
diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index 1f5d0f0..79d64a1 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -138,10 +138,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
*/
protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents);
+ @Override
public Queue<TriggerResult> getPendingTriggers() {
return pendingTriggers;
}
+ @Override
public void shutdown() {
try {
LOG.info("window manager [{}] is being shutdown", windowManager);
diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index ff5d9bf..6be4be4 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -47,6 +47,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
LOG.debug("InMemoryTridentWindowManager.onTuplesExpired");
}
+ @Override
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
for (TridentTuple tridentTuple : tuples) {
diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 98aba26..f07d3f1 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -52,6 +52,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
}
+ @Override
protected void initialize() {
// get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
@@ -119,6 +120,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
return key.substring(secondLastSepIndex + 1, lastSepIndex);
}
+ @Override
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
List<WindowsStore.Entry> entries = new ArrayList<>();
@@ -159,6 +161,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
}
+ @Override
public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
List<TridentTuple> resultTuples = new ArrayList<>();
List<String> keys = new ArrayList<>();
@@ -188,6 +191,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
return null;
}
+ @Override
public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
if (maxCachedTuplesSize != null) {
currentCachedTuplesSize.addAndGet(-expiredTuples.size());
diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
index 9543934..72794fd 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/config/BaseWindowConfig.java
@@ -34,6 +34,7 @@ public abstract class BaseWindowConfig implements WindowConfig {
return slideLength;
}
+ @Override
public void validate() {
if (slideLength > windowLength) {
throw new IllegalArgumentException(
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/Fields.java b/storm-client/src/jvm/org/apache/storm/tuple/Fields.java
index 5661516..bdd80b7 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/Fields.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/Fields.java
@@ -81,6 +81,7 @@ public class Fields implements Iterable<String>, Serializable {
return _fields.get(index);
}
+ @Override
public Iterator<String> iterator() {
return _fields.iterator();
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
index d949f93..85a4c4b 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java
@@ -119,6 +119,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
return port;
}
+ @Override
public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException {
if (func == null) {
throw new IllegalArgumentException("DRPC Function cannot be null");
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java b/storm-client/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
index fc196e5..f0f3c7a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
@@ -47,6 +47,7 @@ public class DefaultShellLogHandler implements ShellLogHandler {
* @param context - the current {@link TopologyContext}.
* @see {@link ShellLogHandler#setUpContext}
*/
+ @Override
public void setUpContext(final Class<?> ownerCls, final ShellProcess process,
final TopologyContext context) {
this.log = getLogger(ownerCls);
@@ -61,6 +62,7 @@ public class DefaultShellLogHandler implements ShellLogHandler {
* @param shellMsg - the {@link ShellMsg} to log.
* @see {@link ShellLogHandler#log}
*/
+ @Override
public void log(final ShellMsg shellMsg) {
if (shellMsg == null) {
throw new IllegalArgumentException("shellMsg is required");
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TimeCacheMap.java b/storm-client/src/jvm/org/apache/storm/utils/TimeCacheMap.java
index b5af684..429f3d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TimeCacheMap.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TimeCacheMap.java
@@ -42,6 +42,7 @@ public class TimeCacheMap<K, V> {
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets - 1);
_cleaner = new Thread(new Runnable() {
+ @Override
public void run() {
try {
while (true) {
diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index b820e59..b76f605 100644
--- a/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-client/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -32,6 +32,7 @@ public class WatermarkCountEvictionPolicy<T> implements EvictionPolicy<T, Pair<L
currentCount = new AtomicLong();
}
+ @Override
public Action evict(Event<T> event) {
if (getContext() == null) {
//It is possible to get asked about eviction before we have a context, due to WindowManager.compactWindow.
diff --git a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
index 63d8cdc..ce567bf 100644
--- a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
+++ b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
@@ -147,10 +147,12 @@ public class PaceMakerStateStorageFactoryTest {
return clientMock;
}
+ @Override
public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
return clientMock.send(m);
}
+ @Override
public List<HBMessage> sendAll(HBMessage m) throws PacemakerConnectionException, InterruptedException {
List<HBMessage> response = new ArrayList<>();
response.add(clientMock.send(m));
diff --git a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
index 840ccb1..837b985 100644
--- a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
+++ b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
@@ -331,10 +331,12 @@ public class TestJoinBolt {
this.fields = new Fields(fieldNames);
}
+ @Override
public String getComponentId(int taskId) {
return "component";
}
+ @Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return fields;
}
diff --git a/storm-client/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java b/storm-client/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java
index 3183241..21c9f63 100644
--- a/storm-client/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java
+++ b/storm-client/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java
@@ -152,6 +152,7 @@ public class DependencyUploaderTest {
final AtomicInteger counter = new AtomicInteger();
final Answer incrementCounter = new Answer() {
+ @Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.addAndGet(1);
return null;
@@ -271,6 +272,7 @@ public class DependencyUploaderTest {
final AtomicInteger counter = new AtomicInteger();
final Answer incrementCounter = new Answer() {
+ @Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.addAndGet(1);
return null;
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
index 8de15eb..7b41b67 100644
--- a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
@@ -152,6 +152,7 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta,
}
/* Indexed */
+ @Override
public Object nth(int i) {
if (i < size()) {
return getValue(i);
@@ -160,6 +161,7 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta,
}
}
+ @Override
public Object nth(int i, Object notfound) {
Object ret = nth(i);
if (ret==null) ret = notfound;
@@ -167,11 +169,13 @@ public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta,
}
/* Counted */
+ @Override
public int count() {
return size();
}
/* IMeta */
+ @Override
public IPersistentMap meta() {
if(_meta==null) {
_meta = new PersistentArrayMap( new Object[] {
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
index 9bdefb8..55ba58b 100644
--- a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
@@ -48,14 +48,17 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map {
return _map;
}
+ @Override
public int size() {
return ((Map) getMap()).size();
}
+ @Override
public int count() {
return size();
}
+ @Override
public ISeq seq() {
return getMap().seq();
}
@@ -77,86 +80,105 @@ public class IndifferentAccessMap implements ILookup, IPersistentMap, Map {
/* IPersistentMap */
/* Naive implementation, but it might be good enough */
+ @Override
public IPersistentMap assoc(Object k, Object v) {
if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
return new IndifferentAccessMap(getMap().assoc(k, v));
}
+ @Override
public IPersistentMap assocEx(Object k, Object v) {
if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
return new IndifferentAccessMap(getMap().assocEx(k, v));
}
+ @Override
public IPersistentMap without(Object k) {
if(k instanceof Keyword) return without(((Keyword) k).getName());
return new IndifferentAccessMap(getMap().without(k));
}
+ @Override
public boolean containsKey(Object k) {
if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
return getMap().containsKey(k);
}
+ @Override
public IMapEntry entryAt(Object k) {
if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
return getMap().entryAt(k);
}
+ @Override
public IPersistentCollection cons(Object o) {
return getMap().cons(o);
}
+ @Override
public IPersistentCollection empty() {
return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
}
+ @Override
public boolean equiv(Object o) {
return getMap().equiv(o);
}
+ @Override
public Iterator iterator() {
return getMap().iterator();
}
/* Map */
+ @Override
public boolean containsValue(Object v) {
return ((Map) getMap()).containsValue(v);
}
+ @Override
public Set entrySet() {
return ((Map) getMap()).entrySet();
}
+ @Override
public Object get(Object k) {
return valAt(k);
}
+ @Override
public boolean isEmpty() {
return ((Map) getMap()).isEmpty();
}
+ @Override
public Set keySet() {
return ((Map) getMap()).keySet();
}
+ @Override
public Collection values() {
return ((Map) getMap()).values();
}
/* Not implemented */
+ @Override
public void clear() {
throw new UnsupportedOperationException();
}
+ @Override
public Object put(Object k, Object v) {
throw new UnsupportedOperationException();
}
+ @Override
public void putAll(Map m) {
throw new UnsupportedOperationException();
}
+ @Override
public Object remove(Object k) {
throw new UnsupportedOperationException();
}
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 372334b..f96b481 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -135,6 +135,7 @@ public class LocalizedResourceRetentionSet {
}
static class LRUComparator implements Comparator<LocallyCachedBlob> {
+ @Override
public int compare(LocallyCachedBlob r1, LocallyCachedBlob r2) {
long ret = r1.getLastUsed() - r2.getLastUsed();
if (0 == ret) {
diff --git a/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java b/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java
index 5d7df0c..73997df 100644
--- a/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java
+++ b/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java
@@ -28,10 +28,12 @@ public class AccessLoggingFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(AccessLoggingFilter.class);
+ @Override
public void init(FilterConfig config) throws ServletException {
//NOOP
}
+ @Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
handle((HttpServletRequest) request, (HttpServletResponse) response, chain);
}
@@ -44,6 +46,7 @@ public class AccessLoggingFilter implements Filter {
chain.doFilter(request, response);
}
+ @Override
public void destroy() {
//NOOP
}
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
index 19de781..5f8bee3 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
@@ -83,6 +83,7 @@ public class Metric implements Comparable<Metric> {
/**
* Check if a Metric matches another object.
*/
+ @Override
public boolean equals(Object other) {
if (!(other instanceof Metric)) {
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
index bf2b840..82e4e35 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
@@ -45,6 +45,7 @@ public interface MetricStore extends AutoCloseable {
/**
* Close the metric store.
*/
+ @Override
void close();
/**
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
index 2ac1c20..6b7617e 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -170,6 +170,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
* @param metric Metric to store
* @throws MetricException if database write fails
*/
+ @Override
public void insert(Metric metric) throws MetricException {
try {
// don't bother blocking on a full queue, just drop metrics in case we can't keep up
@@ -341,6 +342,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
* @param scanCallback callback for each Metric found
* @throws MetricException on error
*/
+ @Override
public void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException {
scanInternal(filter, scanCallback, null);
}
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
index d70076f..bf90c69 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
@@ -101,6 +101,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
* @param s The string to look for
* @return the metadata associated with the string or null if not found
*/
+ @Override
public StringMetadata get(String s) {
return lruStringCache.get(s);
}
@@ -118,6 +119,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
* @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately
* @throws MetricException when evicted data fails to save to the database or when the database is shutdown
*/
+ @Override
public void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException {
if (dbWriter.isShutdown()) {
// another thread could be writing out the metadata cache to the database.
@@ -142,6 +144,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
* @param val The evicted string's metadata
* @throws RuntimeException when evicted data fails to save to the database
*/
+ @Override
public void evictionCallback(String key, StringMetadata val) {
writeMetadataToDisk(key, val);
}
@@ -168,6 +171,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
* @param stringId The string Id to check
* @return true if the Id is in the cache, false otherwise
*/
+ @Override
public boolean contains(Integer stringId) {
return hashToString.containsKey(stringId);
}
@@ -178,6 +182,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
* @param stringId The string Id to check
* @return the associated string if the Id is in the cache, null otherwise
*/
+ @Override
public String getMetadataString(Integer stringId) {
return hashToString.get(stringId);
}
@@ -187,6 +192,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String,
*
* @return the string metadata map entrySet
*/
+ @Override
public Set<Map.Entry<String, StringMetadata>> entrySet() {
return lruStringCache.entrySet();
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
index ee79d59..94f2407 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -207,6 +207,7 @@ public class User {
*/
static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> {
+ @Override
public int compare(TopologyDetails topo1, TopologyDetails topo2) {
if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
return 1;
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
index e226097..7d2f3fe 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -182,13 +182,16 @@ public class TestUtilsForBlacklistScheduler {
_isDistributed = isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
@@ -197,12 +200,15 @@ public class TestUtilsForBlacklistScheduler {
_collector.emit(new Values(word));
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 366ee78..e29830e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -316,13 +316,16 @@ public class TestUtilsForResourceAwareScheduler {
_isDistributed = isDistributed;
}
+ @Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
+ @Override
public void close() {
}
+ @Override
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[]{ "nathan", "mike", "jackson", "golda", "bertels" };
@@ -331,12 +334,15 @@ public class TestUtilsForResourceAwareScheduler {
_collector.emit(new Values(word));
}
+ @Override
public void ack(Object msgId) {
}
+ @Override
public void fail(Object msgId) {
}
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
index e16c6db..f5dbc80 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
@@ -52,7 +52,8 @@ public class ReqContextFilter implements Filter {
httpCredsHandler.populateContext(ReqContext.context(), request);
}
}
-
+
+ @Override
public void init(FilterConfig config) throws ServletException {
//NOOP
//We could add in configs through the web.xml if we wanted something stand alone here...
@@ -64,6 +65,7 @@ public class ReqContextFilter implements Filter {
* @param response the response to populate
* @param chain the next chain of entities to pass the object to
*/
+ @Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
}
@@ -81,6 +83,7 @@ public class ReqContextFilter implements Filter {
chain.doFilter(request, response);
}
+ @Override
public void destroy() {
//NOOP
}
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/exceptionmappers/NotAliveExceptionMapper.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/exceptionmappers/NotAliveExceptionMapper.java
index 0cff689..fc69511 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/exceptionmappers/NotAliveExceptionMapper.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/exceptionmappers/NotAliveExceptionMapper.java
@@ -34,6 +34,7 @@ public class NotAliveExceptionMapper implements ExceptionMapper<NotAliveExceptio
@Inject
public javax.inject.Provider<HttpServletRequest> request;
+ @Override
public Response toResponse(NotAliveException ex) {
return getResponse(ex, request);
}