You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/28 00:32:31 UTC

[6/7] storm git commit: Merge branch 'master' into STORM-974

Merge branch 'master' into STORM-974

Conflicts:
	external/storm-elasticsearch/README.md
	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
	external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28199890
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28199890
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28199890

Branch: refs/heads/master
Commit: 2819989001fe0352aa65201192358cdc69fc9356
Parents: 40472e0 4faee0e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 28 07:31:00 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 28 07:31:00 2015 +0900

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 CHANGELOG.md                                    |  19 +
 README.markdown                                 |   8 +
 SECURITY.md                                     |   2 +-
 STORM-UI-REST-API.md                            |  41 +-
 TODO                                            | 178 -----
 bin/storm-config.cmd                            |  18 +-
 bin/storm.py                                    |  22 +-
 conf/defaults.yaml                              |  10 +-
 dev-tools/travis/travis-script.sh               |   2 +-
 docs/documentation/FAQ.md                       |   7 +-
 .../nimbus_ha_leader_election_and_failover.png  | Bin 0 -> 154316 bytes
 .../images/nimbus_ha_topology_submission.png    | Bin 0 -> 134180 bytes
 docs/documentation/nimbus-ha-design.md          | 217 +++++
 examples/storm-starter/pom.xml                  |  29 +
 .../starter/trident/TridentKafkaWordCount.java  | 230 ++++++
 .../flux-core/src/test/resources/log4j2.xml     |  34 +
 .../flux-core/src/test/resources/logback.xml    |  30 -
 external/storm-elasticsearch/README.md          |  20 +-
 external/storm-elasticsearch/pom.xml            |  10 +-
 .../elasticsearch/ElasticsearchGetRequest.java  |  36 +
 .../elasticsearch/EsLookupResultOutput.java     |  43 +
 .../elasticsearch/bolt/AbstractEsBolt.java      |  50 +-
 .../elasticsearch/bolt/ElasticSearchClient.java |  57 ++
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |  71 ++
 .../elasticsearch/bolt/TransportAddresses.java  |  72 ++
 .../bolt/AbstractEsBoltIntegrationTest.java     |  91 +++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |  75 +-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |  33 +-
 .../bolt/EsLookupBoltIntegrationTest.java       | 132 +++
 .../elasticsearch/bolt/EsLookupBoltTest.java    | 120 +++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |  26 +-
 .../bolt/TransportAddressesTest.java            |  81 ++
 external/storm-hbase/pom.xml                    |   2 +-
 external/storm-hdfs/README.md                   |   9 +
 external/storm-hdfs/pom.xml                     |  15 +-
 .../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
 .../apache/storm/hdfs/trident/HdfsState.java    | 392 +++++++--
 .../trident/rotation/FileRotationPolicy.java    |  14 +
 .../rotation/FileSizeRotationPolicy.java        |  13 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |  10 +
 .../trident/rotation/TimedRotationPolicy.java   |  31 +-
 .../storm/hdfs/trident/HdfsStateTest.java       | 206 +++++
 external/storm-hive/README.md                   |   3 +-
 .../org/apache/storm/hive/bolt/HiveBolt.java    |  39 +-
 .../apache/storm/hive/common/HiveOptions.java   |  11 +
 .../apache/storm/hive/bolt/TestHiveBolt.java    |  12 +-
 external/storm-jdbc/README.md                   |   6 +-
 .../storm/jdbc/trident/state/JdbcState.java     |   2 +-
 .../UserPersistanceTridentTopology.java         |   2 +-
 external/storm-kafka/CHANGELOG.md               |  13 -
 external/storm-kafka/README.md                  |   3 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |   6 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |  10 +-
 .../src/jvm/storm/kafka/SpoutConfig.java        |   3 +
 external/storm-solr/README.md                   | 201 +++++
 external/storm-solr/pom.xml                     |  98 +++
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  | 136 ++++
 .../storm/solr/config/CountBasedCommit.java     |  59 ++
 .../storm/solr/config/SolrCommitStrategy.java   |  30 +
 .../apache/storm/solr/config/SolrConfig.java    |  42 +
 .../storm/solr/mapper/SolrFieldsMapper.java     | 182 +++++
 .../storm/solr/mapper/SolrJsonMapper.java       | 160 ++++
 .../apache/storm/solr/mapper/SolrMapper.java    |  32 +
 .../storm/solr/mapper/SolrMapperException.java  |  24 +
 .../org/apache/storm/solr/schema/CopyField.java |  50 ++
 .../org/apache/storm/solr/schema/Field.java     |  50 ++
 .../org/apache/storm/solr/schema/FieldType.java |  63 ++
 .../org/apache/storm/solr/schema/Schema.java    | 116 +++
 .../storm/solr/schema/SolrFieldTypeFinder.java  | 182 +++++
 .../schema/builder/RestJsonSchemaBuilder.java   |  69 ++
 .../solr/schema/builder/SchemaBuilder.java      |  27 +
 .../apache/storm/solr/trident/SolrState.java    |  67 ++
 .../storm/solr/trident/SolrStateFactory.java    |  44 +
 .../apache/storm/solr/trident/SolrUpdater.java  |  33 +
 .../storm/solr/spout/SolrFieldsSpout.java       |  76 ++
 .../apache/storm/solr/spout/SolrJsonSpout.java  | 120 +++
 .../storm/solr/topology/SolrFieldsTopology.java |  56 ++
 .../storm/solr/topology/SolrJsonTopology.java   |  48 ++
 .../storm/solr/topology/SolrTopology.java       |  82 ++
 .../solr/trident/SolrFieldsTridentTopology.java |  45 ++
 .../solr/trident/SolrJsonTridentTopology.java   |  45 ++
 .../org/apache/storm/solr/util/TestUtil.java    |  30 +
 pom.xml                                         |  19 +-
 storm-core/pom.xml                              |  79 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  94 ++-
 .../backtype/storm/command/shell_submission.clj |   9 +-
 storm-core/src/clj/backtype/storm/config.clj    |  15 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   2 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 252 ++++--
 .../clj/backtype/storm/daemon/supervisor.clj    |  66 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |  23 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  95 ++-
 storm-core/src/clj/backtype/storm/zookeeper.clj | 108 ++-
 storm-core/src/jvm/backtype/storm/Config.java   |  69 +-
 .../jvm/backtype/storm/ConfigValidation.java    |  13 +-
 .../storm/codedistributor/ICodeDistributor.java |  56 ++
 .../LocalFileSystemCodeDistributor.java         | 106 +++
 .../storm/generated/ClusterSummary.java         | 292 ++++---
 .../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
 .../backtype/storm/generated/TopologyInfo.java  | 221 +++--
 .../storm/generated/TopologySummary.java        | 107 ++-
 .../backtype/storm/nimbus/ILeaderElector.java   |  60 ++
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |  93 +++
 .../security/auth/SaslTransportPlugin.java      |   6 +
 .../kerberos/KerberosSaslTransportPlugin.java   |   5 +-
 .../security/auth/kerberos/NoOpTTrasport.java   |  40 +
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   8 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |  78 +-
 .../jvm/backtype/storm/utils/RotatingMap.java   |  12 +-
 .../jvm/backtype/storm/utils/TimeCacheMap.java  |  60 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   9 +
 storm-core/src/py/storm/ttypes.py               | 613 ++++++++------
 storm-core/src/storm.thrift                     |  12 +-
 storm-core/src/ui/public/index.html             |  21 +
 .../public/templates/index-page-template.html   |  59 +-
 .../templates/topology-page-template.html       |  18 +-
 storm-core/src/ui/public/topology.html          |  35 +-
 .../test/clj/backtype/storm/cluster_test.clj    |  23 +-
 .../test/clj/backtype/storm/config_test.clj     |   4 +-
 .../storm/messaging/netty_unit_test.clj         |   2 +-
 .../test/clj/backtype/storm/nimbus_test.clj     | 210 +++--
 .../backtype/storm/security/auth/auth_test.clj  |   4 +-
 .../storm/security/auth/nimbus_auth_test.clj    |  14 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   1 +
 .../test/clj/backtype/storm/utils_test.clj      |  12 -
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 128 files changed, 7411 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/README.md
index 24355db,1b18019..a4792c7
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@@ -36,23 -34,27 +36,41 @@@ EsPercolateBolt percolateBolt = new EsP
  If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
  for each Percolate.Match in PercolateResponse.
  
 +## EsState (org.apache.storm.elasticsearch.trident.EsState)
 +
 +Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
 +
 +```code
 +   EsConfig esConfig = new EsConfig();
 +   esConfig.setClusterName(clusterName);
 +   esConfig.setNodes(new String[]{"localhost:9300"});
 +   EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
 +
 +   StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
 +   TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
 + ```
 +
+ ## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
+ 
+ EsLookupBolt performs a get request to Elasticsearch. 
+ In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
+     ElasticsearchGetRequest is used to convert the incoming Tuple to the GetRequest that will be executed against Elasticsearch.
+     EsLookupResultOutput is used to declare the output fields and convert the GetResponse to values that are emited by the bolt.
+ 
+ Incoming tuple is passed to provided GetRequest creator and the result of that execution is passed to Elasticsearch client.
+ The bolt then uses the provider output adapter (EsLookupResultOutput) to convert the GetResponse to Values to emit.
+ The output fields are also specified by the user of the bolt via the output adapter (EsLookupResultOutput).
+ 
+ ```java
+ EsConfig esConfig = createEsConfig();
+ ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
+ EsLookupResultOutput output = createOutput();
+ EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
+ ```
+ 
  ## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
    
 -Two bolts above takes in EsConfig as a constructor arg.
 +Provided components (Bolt, State) takes in EsConfig as a constructor arg.
  
    ```java
     EsConfig esConfig = new EsConfig();

http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index 0ad549d,d9c09d0..17ccdc9
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@@ -17,63 -17,43 +17,42 @@@
   */
  package org.apache.storm.elasticsearch.bolt;
  
 +import backtype.storm.Config;
 +import backtype.storm.task.OutputCollector;
- import org.apache.commons.io.FileUtils;
- import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
- import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
- import org.elasticsearch.client.Requests;
- import org.elasticsearch.cluster.ClusterName;
- import org.elasticsearch.cluster.metadata.IndexMetaData;
- import org.elasticsearch.common.Priority;
- import org.elasticsearch.common.settings.ImmutableSettings;
- import org.elasticsearch.common.unit.TimeValue;
- import org.elasticsearch.common.util.concurrent.EsExecutors;
- import org.elasticsearch.node.Node;
- import org.elasticsearch.node.NodeBuilder;
- import org.junit.AfterClass;
- import org.junit.BeforeClass;
+ import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.runner.RunWith;
+ import org.mockito.Mock;
+ import org.mockito.runners.MockitoJUnitRunner;
  
- import java.io.File;
 -import backtype.storm.Config;
 -import backtype.storm.task.OutputCollector;
 -
+ @RunWith(MockitoJUnitRunner.class)
+ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
  
- import static org.hamcrest.CoreMatchers.equalTo;
- import static org.hamcrest.MatcherAssert.assertThat;
- import static org.mockito.Mockito.mock;
- 
- public class AbstractEsBoltTest {
      protected static Config config = new Config();
-     protected static OutputCollector collector = mock(OutputCollector.class);
-     protected static Node node;
  
-     @BeforeClass
-     public static void setup() throws Exception {
-         node = NodeBuilder.nodeBuilder().data(true).settings(
-                 ImmutableSettings.builder()
-                         .put(ClusterName.SETTING, "test-cluster")
-                         .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                         .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                         .put(EsExecutors.PROCESSORS, 1)
-                         .put("http.enabled", false)
-                         .put("index.percolator.map_unmapped_fields_as_string", true)
-                         .put("index.store.type", "memory")
-         ).build();
-         node.start();
-         ensureEsGreen(node);
-         ClusterHealthResponse chr = node.client().admin().cluster()
-                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
-         Thread.sleep(1000);
+     @Mock
+     protected OutputCollector outputCollector;
+ 
+     protected Bolt bolt;
+ 
+     @Before
+     public void createBolt() throws Exception {
+         bolt = createBolt(esConfig());
+         bolt.prepare(config, null, outputCollector);
      }
  
-     @AfterClass
-     public static void cleanup() throws Exception {
-         node.stop();
-         node.close();
-         FileUtils.deleteDirectory(new File("./data"));
+     protected abstract Bolt createBolt(EsConfig esConfig);
+ 
+     protected EsConfig esConfig() {
+         EsConfig esConfig = new EsConfig();
+         esConfig.setClusterName("test-cluster");
+         esConfig.setNodes(new String[] {"127.0.0.1:9300"});
+         return esConfig;
      }
  
-     private static void ensureEsGreen(Node node) {
-         ClusterHealthResponse chr = node.client().admin().cluster()
-                 .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
-         assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+     @After
+     public void cleanupBolt() throws Exception {
+         bolt.cleanup();
      }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index a4f2db2,d2def5d..5860b3b
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@@ -28,8 -27,10 +28,7 @@@ import org.junit.Test
  
  import static org.mockito.Mockito.verify;
  
- public class EsIndexBoltTest extends AbstractEsBoltTest{
-     private EsIndexBolt bolt;
+ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
 -    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
  
      @Test
      public void testEsIndexBolt()
@@@ -59,7 -50,16 +48,17 @@@
                  .execute().actionGet();
  
          Assert.assertEquals(1, resp.getCount());
+     }
+ 
+     private Tuple createTestTuple(String index, String type) {
+         String source = "{\"user\":\"user1\"}";
+         String id = "docId";
+         return EsTestUtil.generateTestTuple(source, index, type, id);
+     }
  
-         bolt.cleanup();
+     @Override
+     protected EsIndexBolt createBolt(EsConfig esConfig) {
 -        return new EsIndexBolt(esConfig);
++        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
++        return new EsIndexBolt(esConfig, tupleMapper);
      }
- }
+ }

http://git-wip-us.apache.org/repos/asf/storm/blob/28199890/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index da80204,d68cc89..075aa35
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@@ -28,8 -27,12 +28,13 @@@ import org.junit.Test
  import static org.mockito.Mockito.verify;
  import static org.mockito.Matchers.any;
  
- public class EsPercolateBoltTest extends AbstractEsBoltTest {
-     private EsPercolateBolt bolt;
+ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
+ 
+     @Override
+     protected EsPercolateBolt createBolt(EsConfig esConfig) {
 -        return  new EsPercolateBolt(esConfig);
++        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
++        return new EsPercolateBolt(esConfig, tupleMapper);
+     }
  
      @Test
      public void testEsPercolateBolt()