You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/28 00:32:31 UTC
[6/7] storm git commit: Merge branch 'master' into STORM-974
Merge branch 'master' into STORM-974
Conflicts:
external/storm-elasticsearch/README.md
external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28199890
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28199890
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28199890
Branch: refs/heads/master
Commit: 2819989001fe0352aa65201192358cdc69fc9356
Parents: 40472e0 4faee0e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 28 07:31:00 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 28 07:31:00 2015 +0900
----------------------------------------------------------------------
.gitignore | 2 +-
CHANGELOG.md | 19 +
README.markdown | 8 +
SECURITY.md | 2 +-
STORM-UI-REST-API.md | 41 +-
TODO | 178 -----
bin/storm-config.cmd | 18 +-
bin/storm.py | 22 +-
conf/defaults.yaml | 10 +-
dev-tools/travis/travis-script.sh | 2 +-
docs/documentation/FAQ.md | 7 +-
.../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes
.../images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes
docs/documentation/nimbus-ha-design.md | 217 +++++
examples/storm-starter/pom.xml | 29 +
.../starter/trident/TridentKafkaWordCount.java | 230 ++++++
.../flux-core/src/test/resources/log4j2.xml | 34 +
.../flux-core/src/test/resources/logback.xml | 30 -
external/storm-elasticsearch/README.md | 20 +-
external/storm-elasticsearch/pom.xml | 10 +-
.../elasticsearch/ElasticsearchGetRequest.java | 36 +
.../elasticsearch/EsLookupResultOutput.java | 43 +
.../elasticsearch/bolt/AbstractEsBolt.java | 50 +-
.../elasticsearch/bolt/ElasticSearchClient.java | 57 ++
.../storm/elasticsearch/bolt/EsLookupBolt.java | 71 ++
.../elasticsearch/bolt/TransportAddresses.java | 72 ++
.../bolt/AbstractEsBoltIntegrationTest.java | 91 +++
.../elasticsearch/bolt/AbstractEsBoltTest.java | 75 +-
.../elasticsearch/bolt/EsIndexBoltTest.java | 33 +-
.../bolt/EsLookupBoltIntegrationTest.java | 132 +++
.../elasticsearch/bolt/EsLookupBoltTest.java | 120 +++
.../elasticsearch/bolt/EsPercolateBoltTest.java | 26 +-
.../bolt/TransportAddressesTest.java | 81 ++
external/storm-hbase/pom.xml | 2 +-
external/storm-hdfs/README.md | 9 +
external/storm-hdfs/pom.xml | 15 +-
.../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
.../apache/storm/hdfs/trident/HdfsState.java | 392 +++++++--
.../trident/rotation/FileRotationPolicy.java | 14 +
.../rotation/FileSizeRotationPolicy.java | 13 +
.../hdfs/trident/rotation/NoRotationPolicy.java | 10 +
.../trident/rotation/TimedRotationPolicy.java | 31 +-
.../storm/hdfs/trident/HdfsStateTest.java | 206 +++++
external/storm-hive/README.md | 3 +-
.../org/apache/storm/hive/bolt/HiveBolt.java | 39 +-
.../apache/storm/hive/common/HiveOptions.java | 11 +
.../apache/storm/hive/bolt/TestHiveBolt.java | 12 +-
external/storm-jdbc/README.md | 6 +-
.../storm/jdbc/trident/state/JdbcState.java | 2 +-
.../UserPersistanceTridentTopology.java | 2 +-
external/storm-kafka/CHANGELOG.md | 13 -
external/storm-kafka/README.md | 3 +
.../src/jvm/storm/kafka/KafkaSpout.java | 6 +-
.../src/jvm/storm/kafka/PartitionManager.java | 10 +-
.../src/jvm/storm/kafka/SpoutConfig.java | 3 +
external/storm-solr/README.md | 201 +++++
external/storm-solr/pom.xml | 98 +++
.../apache/storm/solr/bolt/SolrUpdateBolt.java | 136 ++++
.../storm/solr/config/CountBasedCommit.java | 59 ++
.../storm/solr/config/SolrCommitStrategy.java | 30 +
.../apache/storm/solr/config/SolrConfig.java | 42 +
.../storm/solr/mapper/SolrFieldsMapper.java | 182 +++++
.../storm/solr/mapper/SolrJsonMapper.java | 160 ++++
.../apache/storm/solr/mapper/SolrMapper.java | 32 +
.../storm/solr/mapper/SolrMapperException.java | 24 +
.../org/apache/storm/solr/schema/CopyField.java | 50 ++
.../org/apache/storm/solr/schema/Field.java | 50 ++
.../org/apache/storm/solr/schema/FieldType.java | 63 ++
.../org/apache/storm/solr/schema/Schema.java | 116 +++
.../storm/solr/schema/SolrFieldTypeFinder.java | 182 +++++
.../schema/builder/RestJsonSchemaBuilder.java | 69 ++
.../solr/schema/builder/SchemaBuilder.java | 27 +
.../apache/storm/solr/trident/SolrState.java | 67 ++
.../storm/solr/trident/SolrStateFactory.java | 44 +
.../apache/storm/solr/trident/SolrUpdater.java | 33 +
.../storm/solr/spout/SolrFieldsSpout.java | 76 ++
.../apache/storm/solr/spout/SolrJsonSpout.java | 120 +++
.../storm/solr/topology/SolrFieldsTopology.java | 56 ++
.../storm/solr/topology/SolrJsonTopology.java | 48 ++
.../storm/solr/topology/SolrTopology.java | 82 ++
.../solr/trident/SolrFieldsTridentTopology.java | 45 ++
.../solr/trident/SolrJsonTridentTopology.java | 45 ++
.../org/apache/storm/solr/util/TestUtil.java | 30 +
pom.xml | 19 +-
storm-core/pom.xml | 79 +-
storm-core/src/clj/backtype/storm/cluster.clj | 94 ++-
.../backtype/storm/command/shell_submission.clj | 9 +-
storm-core/src/clj/backtype/storm/config.clj | 15 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 2 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 252 ++++--
.../clj/backtype/storm/daemon/supervisor.clj | 66 +-
storm-core/src/clj/backtype/storm/thrift.clj | 23 +-
storm-core/src/clj/backtype/storm/timer.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 95 ++-
storm-core/src/clj/backtype/storm/zookeeper.clj | 108 ++-
storm-core/src/jvm/backtype/storm/Config.java | 69 +-
.../jvm/backtype/storm/ConfigValidation.java | 13 +-
.../storm/codedistributor/ICodeDistributor.java | 56 ++
.../LocalFileSystemCodeDistributor.java | 106 +++
.../storm/generated/ClusterSummary.java | 292 ++++---
.../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
.../backtype/storm/generated/TopologyInfo.java | 221 +++--
.../storm/generated/TopologySummary.java | 107 ++-
.../backtype/storm/nimbus/ILeaderElector.java | 60 ++
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 93 +++
.../security/auth/SaslTransportPlugin.java | 6 +
.../kerberos/KerberosSaslTransportPlugin.java | 5 +-
.../security/auth/kerberos/NoOpTTrasport.java | 40 +
.../src/jvm/backtype/storm/task/ShellBolt.java | 8 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 78 +-
.../jvm/backtype/storm/utils/RotatingMap.java | 12 +-
.../jvm/backtype/storm/utils/TimeCacheMap.java | 60 +-
.../src/jvm/backtype/storm/utils/Utils.java | 9 +
storm-core/src/py/storm/ttypes.py | 613 ++++++++------
storm-core/src/storm.thrift | 12 +-
storm-core/src/ui/public/index.html | 21 +
.../public/templates/index-page-template.html | 59 +-
.../templates/topology-page-template.html | 18 +-
storm-core/src/ui/public/topology.html | 35 +-
.../test/clj/backtype/storm/cluster_test.clj | 23 +-
.../test/clj/backtype/storm/config_test.clj | 4 +-
.../storm/messaging/netty_unit_test.clj | 2 +-
.../test/clj/backtype/storm/nimbus_test.clj | 210 +++--
.../backtype/storm/security/auth/auth_test.clj | 4 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../test/clj/backtype/storm/supervisor_test.clj | 1 +
.../test/clj/backtype/storm/utils_test.clj | 12 -
storm-dist/binary/src/main/assembly/binary.xml | 14 +
128 files changed, 7411 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/README.md
index 24355db,1b18019..a4792c7
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@@ -36,23 -34,27 +36,41 @@@ EsPercolateBolt percolateBolt = new EsP
If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
for each Percolate.Match in PercolateResponse.
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
+
+```code
+ EsConfig esConfig = new EsConfig();
+ esConfig.setClusterName(clusterName);
+ esConfig.setNodes(new String[]{"localhost:9300"});
+ EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+
+ StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+ TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+
+ ## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
+
+ EsLookupBolt performs a get request to Elasticsearch.
+ In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
+ ElasticsearchGetRequest is used to convert the incoming Tuple to the GetRequest that will be executed against Elasticsearch.
+ EsLookupResultOutput is used to declare the output fields and convert the GetResponse to values that are emited by the bolt.
+
+ Incoming tuple is passed to provided GetRequest creator and the result of that execution is passed to Elasticsearch client.
+ The bolt then uses the provider output adapter (EsLookupResultOutput) to convert the GetResponse to Values to emit.
+ The output fields are also specified by the user of the bolt via the output adapter (EsLookupResultOutput).
+
+ ```java
+ EsConfig esConfig = createEsConfig();
+ ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
+ EsLookupResultOutput output = createOutput();
+ EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
+ ```
+
## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
-Two bolts above takes in EsConfig as a constructor arg.
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
```java
EsConfig esConfig = new EsConfig();
http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index 0ad549d,d9c09d0..17ccdc9
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@@ -17,63 -17,43 +17,42 @@@
*/
package org.apache.storm.elasticsearch.bolt;
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
- import org.apache.commons.io.FileUtils;
- import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
- import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
- import org.elasticsearch.client.Requests;
- import org.elasticsearch.cluster.ClusterName;
- import org.elasticsearch.cluster.metadata.IndexMetaData;
- import org.elasticsearch.common.Priority;
- import org.elasticsearch.common.settings.ImmutableSettings;
- import org.elasticsearch.common.unit.TimeValue;
- import org.elasticsearch.common.util.concurrent.EsExecutors;
- import org.elasticsearch.node.Node;
- import org.elasticsearch.node.NodeBuilder;
- import org.junit.AfterClass;
- import org.junit.BeforeClass;
+ import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.runner.RunWith;
+ import org.mockito.Mock;
+ import org.mockito.runners.MockitoJUnitRunner;
- import java.io.File;
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-
+ @RunWith(MockitoJUnitRunner.class)
+ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
- import static org.hamcrest.CoreMatchers.equalTo;
- import static org.hamcrest.MatcherAssert.assertThat;
- import static org.mockito.Mockito.mock;
-
- public class AbstractEsBoltTest {
protected static Config config = new Config();
- protected static OutputCollector collector = mock(OutputCollector.class);
- protected static Node node;
- @BeforeClass
- public static void setup() throws Exception {
- node = NodeBuilder.nodeBuilder().data(true).settings(
- ImmutableSettings.builder()
- .put(ClusterName.SETTING, "test-cluster")
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", false)
- .put("index.percolator.map_unmapped_fields_as_string", true)
- .put("index.store.type", "memory")
- ).build();
- node.start();
- ensureEsGreen(node);
- ClusterHealthResponse chr = node.client().admin().cluster()
- .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
- Thread.sleep(1000);
+ @Mock
+ protected OutputCollector outputCollector;
+
+ protected Bolt bolt;
+
+ @Before
+ public void createBolt() throws Exception {
+ bolt = createBolt(esConfig());
+ bolt.prepare(config, null, outputCollector);
}
- @AfterClass
- public static void cleanup() throws Exception {
- node.stop();
- node.close();
- FileUtils.deleteDirectory(new File("./data"));
+ protected abstract Bolt createBolt(EsConfig esConfig);
+
+ protected EsConfig esConfig() {
+ EsConfig esConfig = new EsConfig();
+ esConfig.setClusterName("test-cluster");
+ esConfig.setNodes(new String[] {"127.0.0.1:9300"});
+ return esConfig;
}
- private static void ensureEsGreen(Node node) {
- ClusterHealthResponse chr = node.client().admin().cluster()
- .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
- assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+ @After
+ public void cleanupBolt() throws Exception {
+ bolt.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index a4f2db2,d2def5d..5860b3b
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@@ -28,8 -27,10 +28,7 @@@ import org.junit.Test
import static org.mockito.Mockito.verify;
- public class EsIndexBoltTest extends AbstractEsBoltTest{
- private EsIndexBolt bolt;
+ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
- private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
@Test
public void testEsIndexBolt()
@@@ -59,7 -50,16 +48,17 @@@
.execute().actionGet();
Assert.assertEquals(1, resp.getCount());
+ }
+
+ private Tuple createTestTuple(String index, String type) {
+ String source = "{\"user\":\"user1\"}";
+ String id = "docId";
+ return EsTestUtil.generateTestTuple(source, index, type, id);
+ }
- bolt.cleanup();
+ @Override
+ protected EsIndexBolt createBolt(EsConfig esConfig) {
- return new EsIndexBolt(esConfig);
++ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
++ return new EsIndexBolt(esConfig, tupleMapper);
}
- }
+ }
http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index da80204,d68cc89..075aa35
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@@ -28,8 -27,12 +28,13 @@@ import org.junit.Test
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.any;
- public class EsPercolateBoltTest extends AbstractEsBoltTest {
- private EsPercolateBolt bolt;
+ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
+
+ @Override
+ protected EsPercolateBolt createBolt(EsConfig esConfig) {
- return new EsPercolateBolt(esConfig);
++ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
++ return new EsPercolateBolt(esConfig, tupleMapper);
+ }
@Test
public void testEsPercolateBolt()