You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/08 18:14:36 UTC
[2/5] storm git commit: Merge branch 'master' of
https://github.com/apache/storm into STORM-2104
Merge branch 'master' of https://github.com/apache/storm into STORM-2104
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd1f5c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd1f5c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd1f5c54
Branch: refs/heads/master
Commit: bd1f5c54752f67b484a83c26667331234234d3a3
Parents: 11fcaff a912cf3
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Sun Nov 20 12:49:49 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Sun Nov 20 13:01:22 2016 +0100
----------------------------------------------------------------------
CHANGELOG.md | 16 +
examples/storm-kafka-client-examples/pom.xml | 134 +++
.../TridentKafkaClientWordCountNamedTopics.java | 156 ++++
...identKafkaClientWordCountWildcardTopics.java | 46 ++
examples/storm-kafka-examples/pom.xml | 18 +
.../storm/kafka/TridentKafkaTopology.java | 91 ---
.../kafka/trident/KafkaProducerTopology.java | 67 ++
.../storm/kafka/trident/LocalSubmitter.java | 91 +++
.../trident/TridentKafkaConsumerTopology.java | 94 +++
.../kafka/trident/TridentKafkaTopology.java | 89 ++
.../kafka/trident/TridentKafkaWordCount.java | 140 ++++
examples/storm-starter/pom.xml | 16 -
.../starter/trident/DebugMemoryMapState.java | 2 +-
.../starter/trident/TridentKafkaWordCount.java | 278 -------
external/sql/pom.xml | 1 +
.../apache/storm/sql/parser/SqlCreateTable.java | 20 +-
.../test/org/apache/storm/sql/TestStormSql.java | 11 +-
.../storm-sql-external/storm-sql-kafka/pom.xml | 4 -
.../sql/kafka/KafkaDataSourcesProvider.java | 63 +-
.../sql/kafka/TestKafkaDataSourcesProvider.java | 31 +-
.../storm-sql-mongodb/pom.xml | 84 ++
.../sql/mongodb/MongoDataSourcesProvider.java | 121 +++
...apache.storm.sql.runtime.DataSourcesProvider | 16 +
.../mongodb/TestMongoDataSourcesProvider.java | 122 +++
.../storm-sql-external/storm-sql-redis/pom.xml | 4 -
.../sql/redis/RedisDataSourcesProvider.java | 48 +-
.../sql/redis/TestRedisDataSourcesProvider.java | 26 +-
external/sql/storm-sql-runtime/pom.xml | 14 +
.../storm/sql/runtime/DataSourcesProvider.java | 5 +-
.../storm/sql/runtime/DataSourcesRegistry.java | 5 +-
.../sql/runtime/serde/avro/AvroScheme.java | 74 ++
.../sql/runtime/serde/avro/AvroSerializer.java | 72 ++
.../sql/runtime/serde/avro/CachedSchemas.java | 41 +
.../storm/sql/runtime/serde/csv/CsvScheme.java | 70 ++
.../sql/runtime/serde/csv/CsvSerializer.java | 59 ++
.../sql/runtime/serde/json/JsonScheme.java | 2 +-
.../storm/sql/runtime/serde/tsv/TsvScheme.java | 58 ++
.../sql/runtime/serde/tsv/TsvSerializer.java | 54 ++
.../storm/sql/runtime/utils/FieldInfoUtils.java | 39 +
.../storm/sql/runtime/utils/SerdeUtils.java | 123 +++
.../apache/storm/sql/runtime/utils/Utils.java | 55 ++
.../apache/storm/sql/TestAvroSerializer.java | 72 ++
.../org/apache/storm/sql/TestCsvSerializer.java | 54 ++
.../org/apache/storm/sql/TestTsvSerializer.java | 46 ++
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 25 +-
external/storm-kafka-client/pom.xml | 26 +
.../apache/storm/kafka/spout/KafkaSpout.java | 31 +-
.../storm/kafka/spout/KafkaSpoutStream.java | 2 +-
.../storm/kafka/spout/KafkaSpoutStreams.java | 3 +
.../spout/KafkaSpoutStreamsNamedTopics.java | 11 +
.../spout/KafkaSpoutStreamsWildcardTopics.java | 6 +
.../trident/KafkaTridentSpoutBatchMetadata.java | 83 ++
.../spout/trident/KafkaTridentSpoutEmitter.java | 197 +++++
.../spout/trident/KafkaTridentSpoutManager.java | 119 +++
.../spout/trident/KafkaTridentSpoutOpaque.java | 80 ++
.../KafkaTridentSpoutOpaqueCoordinator.java | 64 ++
.../KafkaTridentSpoutTopicPartition.java | 68 ++
...KafkaTridentSpoutTopicPartitionRegistry.java | 48 ++
.../trident/KafkaTridentSpoutTransactional.java | 48 ++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 4 +-
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 240 ++++++
.../SingleTopicKafkaSpoutConfiguration.java | 30 +-
.../builders/TopicKeyValueTupleBuilder.java | 2 +-
.../kafka/trident/OpaqueTridentKafkaSpout.java | 5 +-
pom.xml | 1 +
.../org/apache/storm/daemon/local_executor.clj | 42 -
.../src/clj/org/apache/storm/daemon/worker.clj | 818 -------------------
storm-core/src/clj/org/apache/storm/testing.clj | 28 +-
.../src/jvm/org/apache/storm/Constants.java | 6 +-
.../src/jvm/org/apache/storm/daemon/Task.java | 46 +-
.../storm/daemon/supervisor/AdvancedFSOps.java | 9 +-
.../storm/daemon/supervisor/BasicContainer.java | 2 +-
.../storm/daemon/supervisor/LocalContainer.java | 11 +-
.../daemon/supervisor/ReadClusterState.java | 2 +-
.../storm/daemon/worker/LogConfigManager.java | 156 ++++
.../org/apache/storm/daemon/worker/Worker.java | 454 ++++++++++
.../apache/storm/daemon/worker/WorkerState.java | 691 ++++++++++++++++
.../jvm/org/apache/storm/executor/Executor.java | 55 +-
.../apache/storm/executor/ExecutorShutdown.java | 5 +-
.../apache/storm/executor/ExecutorTransfer.java | 15 +-
.../apache/storm/executor/IRunningExecutor.java | 2 +-
.../apache/storm/executor/LocalExecutor.java | 56 ++
.../storm/executor/bolt/BoltExecutor.java | 10 +-
.../storm/executor/spout/SpoutExecutor.java | 3 +-
.../DeserializingConnectionCallback.java | 10 +-
.../storm/scheduler/resource/RAS_Node.java | 4 +-
.../org/apache/storm/security/auth/AutoSSL.java | 161 ++++
.../storm/security/auth/ThriftClient.java | 13 +-
.../org/apache/storm/task/TopologyContext.java | 3 +
.../org/apache/storm/utils/TransferDrainer.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 116 ++-
.../apache/storm/messaging/netty_unit_test.clj | 13 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 6 +-
.../test/clj/org/apache/storm/worker_test.clj | 205 -----
.../daemon/supervisor/BasicContainerTest.java | 2 +-
.../daemon/worker/LogConfigManagerTest.java | 219 +++++
.../apache/storm/daemon/worker/WorkerTest.java | 39 +
.../apache/storm/security/auth/AutoSSLTest.java | 136 +++
storm-dist/binary/src/main/assembly/binary.xml | 7 +
99 files changed, 5299 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 34b9fdc,2d9a844..2b56336
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -70,8 -77,21 +84,20 @@@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j-over-slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5534082,439492b..ce01a76
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@@ -299,19 -289,18 +302,21 @@@ public class KafkaSpout<K, V> extends B
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
- } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
- final List<Object> tuple = tuplesBuilder.buildTuple(record);
- kafkaSpoutStreams.emit(collector, tuple, msgId);
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
- retryService.remove(msgId); // re-emitted hence remove from failed
+ } else {
+ boolean isScheduled = retryService.isScheduled(msgId);
+ if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
++ return true;
}
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
- return true;
}
+ return false;
}
private void commitOffsetsForAckedTuples() {
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index b5428f9,0000000..3e077ab
mode 100644,000000..100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@@ -1,166 -1,0 +1,166 @@@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
+ private Map conf;
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
+ conf = new HashMap<>();
+ consumerMock = mock(KafkaConsumer.class);
+ consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ }
+
+ //Returns messageIds in order of emission
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(partitionThatWillBeRevoked);
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ //Make the consumer return a single message for each partition
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+ //Emit the messages
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+ //Now rebalance
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+
+ List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+ emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+ emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+ return emittedMessageIds;
+ }
+
+ @Test
+ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.topic;
++ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Thread.sleep(510);
+
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
+
+ @Test
+ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.topic;
++ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Fail both emitted tuples
+ spout.fail(emittedMessageIds.get(0));
+ spout.fail(emittedMessageIds.get(1));
+
+ //Check that only the tuple on the currently assigned partition is retried
+ verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+ verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 157543c,baece93..6921f7c
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@@ -46,17 -46,9 +46,17 @@@ public class SingleTopicKafkaSpoutConfi
return tp.createTopology();
}
- static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
- .setOffsetCommitPeriodMs(10_000)
++ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ }
-
- static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
++
++ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ }
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.setPollTimeoutMs(1000)