You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/01 15:32:03 UTC
[01/10] storm git commit: Fix STORM-1013
Repository: storm
Updated Branches:
refs/heads/master bf5c35ba4 -> 436878128
Fix STORM-1013
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a54fdb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a54fdb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a54fdb2
Branch: refs/heads/master
Commit: 6a54fdb2e617723ca36a82258ba562fd7fadb29f
Parents: 8618eb3
Author: Alex Panov <al...@teradata.com>
Authored: Fri Aug 28 17:35:39 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Fri Aug 28 17:35:39 2015 +0200
----------------------------------------------------------------------
external/storm-elasticsearch/pom.xml | 6 ++
.../elasticsearch/bolt/AbstractEsBolt.java | 10 ++-
.../elasticsearch/bolt/ElasticSearchClient.java | 57 --------------
.../storm/elasticsearch/bolt/EsLookupBolt.java | 10 +++
.../elasticsearch/bolt/TransportAddresses.java | 72 -----------------
.../storm/elasticsearch/common/EsConfig.java | 65 +++++++++++-----
.../common/StormElasticSearchClient.java | 48 ++++++++++++
.../common/TransportAddresses.java | 72 +++++++++++++++++
.../storm/elasticsearch/trident/EsState.java | 33 +++-----
.../elasticsearch/trident/EsStateFactory.java | 18 ++---
.../storm/elasticsearch/trident/EsUpdater.java | 2 +-
.../elasticsearch/bolt/AbstractEsBoltTest.java | 15 +++-
.../elasticsearch/bolt/EsIndexBoltTest.java | 5 ++
.../elasticsearch/bolt/EsIndexTopology.java | 4 +-
.../bolt/EsLookupBoltIntegrationTest.java | 5 ++
.../elasticsearch/bolt/EsLookupBoltTest.java | 5 ++
.../elasticsearch/bolt/EsPercolateBoltTest.java | 5 ++
.../bolt/TransportAddressesTest.java | 81 --------------------
.../elasticsearch/common/EsConfigTest.java | 54 +++++++++++++
.../elasticsearch/common/SniffSettings.java | 5 ++
.../common/TransportAddressesTest.java | 81 ++++++++++++++++++++
.../trident/TridentEsTopology.java | 4 +-
22 files changed, 381 insertions(+), 276 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml
index ecc1bd2..7777549 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -63,6 +63,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index a19a660..fa7356a 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ package org.apache.storm.elasticsearch.bolt;
import java.util.Map;
import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -31,6 +32,8 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public abstract class AbstractEsBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractEsBolt.class);
@@ -41,6 +44,7 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
private EsConfig esConfig;
public AbstractEsBolt(EsConfig esConfig) {
+ checkNotNull(esConfig);
this.esConfig = esConfig;
}
@@ -50,7 +54,7 @@ public abstract class AbstractEsBolt extends BaseRichBolt {
this.collector = outputCollector;
synchronized (AbstractEsBolt.class) {
if (client == null) {
- client = new ElasticSearchClient(esConfig).construct();
+ client = new StormElasticSearchClient(esConfig).construct();
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
deleted file mode 100644
index 0a9f4ea..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/ElasticSearchClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import java.io.Serializable;
-
-import org.apache.storm.elasticsearch.common.EsConfig;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class ElasticSearchClient implements Serializable {
-
- private final EsConfig esConfig;
-
- ElasticSearchClient(EsConfig esConfig) {
- this.esConfig = esConfig;
- }
-
- Client construct() {
- Settings settings = createBasicSettings();
- TransportClient transportClient = new TransportClient(settings);
- addTransportAddresses(transportClient);
- return transportClient;
- }
-
- private Settings createBasicSettings() {
- return ImmutableSettings.settingsBuilder()
- .put("cluster.name", esConfig.getClusterName())
- .put("client.transport.sniff", "true")
- .build();
- }
-
- private void addTransportAddresses(TransportClient transportClient) {
- Iterable<InetSocketTransportAddress> transportAddresses = new TransportAddresses(esConfig.getNodes());
- for (InetSocketTransportAddress transportAddress : transportAddresses) {
- transportClient.addTransportAddress(transportAddress);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index d9982f2..bd7178b 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -29,13 +29,23 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @since 0.11
+ */
public class EsLookupBolt extends AbstractEsBolt {
private final ElasticsearchGetRequest getRequest;
private final EsLookupResultOutput output;
+ /**
+ * @throws NullPointerException if any of the parameters is null
+ */
public EsLookupBolt(EsConfig esConfig, ElasticsearchGetRequest getRequest, EsLookupResultOutput output) {
super(esConfig);
+ checkNotNull(getRequest);
+ checkNotNull(output);
this.getRequest = getRequest;
this.output = output;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
deleted file mode 100644
index f4479df..0000000
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/TransportAddresses.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-
-final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
-
- static final String DELIMETER = ":";
-
- private final String[] nodes;
-
- TransportAddresses(String[] nodes) {
- if (nodes == null) {
- throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
- }
- if (nodes.length == 0) {
- throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
- }
-
- this.nodes = nodes;
- }
-
- @Override
- public Iterator<InetSocketTransportAddress> iterator() {
- List<InetSocketTransportAddress> result = new LinkedList<>();
-
- for (String node : nodes) {
- InetSocketTransportAddress transportAddress = transformToInetAddress(node);
- result.add(transportAddress);
- }
-
- return result.iterator();
- }
-
- private InetSocketTransportAddress transformToInetAddress(String node) {
- String[] hostAndPort = node.split(DELIMETER);
- if (hostAndPort.length != 2) {
- throw new IllegalArgumentException(
- "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
- }
- String hostname = hostname(hostAndPort[0]);
- return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
- }
-
- private String hostname(String input) {
- return input.trim();
- }
-
- private int port(String input) {
- return Integer.parseInt(input.trim());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 0b57788..5724519 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,37 +18,66 @@
package org.apache.storm.elasticsearch.common;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
-public class EsConfig implements Serializable{
- private String clusterName;
- private String[] nodes;
+import com.google.common.collect.ImmutableMap;
- public EsConfig() {
- }
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @since 0.11
+ */
+public class EsConfig implements Serializable {
+
+ private final String clusterName;
+ private final String[] nodes;
+ private final Map<String, String> additionalConfiguration;
/**
* EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+ *
* @param clusterName Elasticsearch cluster name
- * @param nodes Elasticsearch addresses in host:port pattern string array
+ * @param nodes Elasticsearch addresses in host:port pattern string array
+ * @throws IllegalArgumentException if nodes are empty
+ * @throws NullPointerException on any of the fields being null
*/
public EsConfig(String clusterName, String[] nodes) {
- this.clusterName = clusterName;
- this.nodes = nodes;
+ this(clusterName, nodes, Collections.<String, String>emptyMap());
}
- public String getClusterName() {
- return clusterName;
- }
+ /**
+ * EsConfig Constructor to be used in EsIndexBolt, EsPercolateBolt and EsStateFactory
+ *
+ * @param clusterName Elasticsearch cluster name
+ * @param nodes Elasticsearch addresses in host:port pattern string array
+ * @param additionalConfiguration Additional Elasticsearch configuration
+ * @throws IllegalArgumentException if nodes are empty
+ * @throws NullPointerException on any of the fields being null
+ */
+ public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
+ checkNotNull(clusterName);
+ checkNotNull(nodes);
+ checkNotNull(additionalConfiguration);
- public void setClusterName(String clusterName) {
+ checkArgument(nodes.length != 0, "Nodes cannot be empty");
this.clusterName = clusterName;
+ this.nodes = nodes;
+ this.additionalConfiguration = ImmutableMap.copyOf(additionalConfiguration);
}
- public String[] getNodes() {
- return nodes;
+ TransportAddresses getTransportAddresses() {
+ return new TransportAddresses(nodes);
}
- public void setNodes(String[] nodes) {
- this.nodes = nodes;
+ Settings toBasicSettings() {
+ return ImmutableSettings.settingsBuilder()
+ .put("cluster.name", clusterName)
+ .put(additionalConfiguration)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
new file mode 100644
index 0000000..3ebfe72
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/StormElasticSearchClient.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.io.Serializable;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+public final class StormElasticSearchClient implements Serializable {
+
+ private final EsConfig esConfig;
+
+ public StormElasticSearchClient(EsConfig esConfig) {
+ this.esConfig = esConfig;
+ }
+
+ public Client construct() {
+ Settings settings = esConfig.toBasicSettings();
+ TransportClient transportClient = new TransportClient(settings);
+ addTransportAddresses(transportClient);
+ return transportClient;
+ }
+
+ private void addTransportAddresses(TransportClient transportClient) {
+ Iterable<InetSocketTransportAddress> transportAddresses = esConfig.getTransportAddresses();
+ for (InetSocketTransportAddress transportAddress : transportAddresses) {
+ transportClient.addTransportAddress(transportAddress);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
new file mode 100644
index 0000000..cd082a7
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/TransportAddresses.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+final class TransportAddresses implements Iterable<InetSocketTransportAddress> {
+
+ static final String DELIMETER = ":";
+
+ private final String[] nodes;
+
+ TransportAddresses(String[] nodes) {
+ if (nodes == null) {
+ throw new IllegalArgumentException("Elasticsearch hosts cannot be null");
+ }
+ if (nodes.length == 0) {
+ throw new IllegalArgumentException("At least one Elasticsearch host must be specified");
+ }
+
+ this.nodes = nodes;
+ }
+
+ @Override
+ public Iterator<InetSocketTransportAddress> iterator() {
+ List<InetSocketTransportAddress> result = new LinkedList<>();
+
+ for (String node : nodes) {
+ InetSocketTransportAddress transportAddress = transformToInetAddress(node);
+ result.add(transportAddress);
+ }
+
+ return result.iterator();
+ }
+
+ private InetSocketTransportAddress transformToInetAddress(String node) {
+ String[] hostAndPort = node.split(DELIMETER);
+ if (hostAndPort.length != 2) {
+ throw new IllegalArgumentException(
+ "Incorrect Elasticsearch node format, should follow {host}" + DELIMETER + "{port} pattern");
+ }
+ String hostname = hostname(hostAndPort[0]);
+ return new InetSocketTransportAddress(hostname, port(hostAndPort[1]));
+ }
+
+ private String hostname(String input) {
+ return input.trim();
+ }
+
+ private int port(String input) {
+ return Integer.parseInt(input.trim());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e804084..e60c003 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -17,27 +17,24 @@
*/
package org.apache.storm.elasticsearch.trident;
-import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
+
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import storm.trident.operation.TridentCollector;
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-public class EsState implements State {
+/**
+ * @since 0.11
+ */
+class EsState implements State {
private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
private static Client client;
private EsConfig esConfig;
@@ -74,23 +71,11 @@ public class EsState implements State {
}
- public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ public void prepare() {
try {
synchronized (EsState.class) {
if (client == null) {
- Settings settings =
- ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName())
- .put("client.transport.sniff", "true").build();
- List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>();
- for (String node : esConfig.getNodes()) {
- String[] hostAndPort = node.split(":");
- if (hostAndPort.length != 2) {
- throw new IllegalArgumentException("incorrect Elasticsearch node format, should follow {host}:{port} pattern");
- }
- transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1])));
- }
- client = new TransportClient(settings)
- .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()]));
+ client = new StormElasticSearchClient(esConfig).construct();
}
}
} catch (Exception e) {
@@ -98,7 +83,7 @@ public class EsState implements State {
}
}
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ public void updateState(List<TridentTuple> tuples) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
String source = tuple.getStringByField("source");
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index c3a2e6c..4f4dd39 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -19,32 +19,30 @@ package org.apache.storm.elasticsearch.trident;
import backtype.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import java.util.Map;
-public class EsStateFactory implements StateFactory {
- private EsConfig esConfig;
-
- public EsStateFactory(){
-
- }
+/**
+ * @since 0.11
+ */
+class EsStateFactory implements StateFactory {
+ private final EsConfig esConfig;
/**
* EsStateFactory constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
*/
- public EsStateFactory(EsConfig esConfig){
+ EsStateFactory(EsConfig esConfig){
this.esConfig = esConfig;
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
EsState esState = new EsState(esConfig);
- esState.prepare(conf, metrics, partitionIndex, numPartitions);
+ esState.prepare();
return esState;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 6fa42f3..685ea8c 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -26,6 +26,6 @@ import java.util.List;
public class EsUpdater extends BaseStateUpdater<EsState> {
@Override
public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
+ state.updateState(tuples);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index d9c09d0..8a5fad2 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -17,9 +17,12 @@
*/
package org.apache.storm.elasticsearch.bolt;
+import com.google.common.testing.NullPointerTester;
+
import org.apache.storm.elasticsearch.common.EsConfig;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@@ -46,14 +49,18 @@ public abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
protected abstract Bolt createBolt(EsConfig esConfig);
protected EsConfig esConfig() {
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName("test-cluster");
- esConfig.setNodes(new String[] {"127.0.0.1:9300"});
- return esConfig;
+ return new EsConfig("test-cluster", new String[] {"127.0.0.1:9300"});
}
@After
public void cleanupBolt() throws Exception {
bolt.cleanup();
}
+
+ @Test
+ public void constructorsThrowOnNull() throws Exception {
+ new NullPointerTester().setDefault(EsConfig.class, esConfig()).testAllPublicConstructors(getBoltClass());
+ }
+
+ protected abstract Class<Bolt> getBoltClass();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index d2def5d..bcba6d4 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -62,4 +62,9 @@ public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt>
protected EsIndexBolt createBolt(EsConfig esConfig) {
return new EsIndexBolt(esConfig);
}
+
+ @Override
+ protected Class<EsIndexBolt> getBoltClass() {
+ return EsIndexBolt.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index fc9c178..3c1e949 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -46,9 +46,7 @@ public class EsIndexTopology {
TopologyBuilder builder = new TopologyBuilder();
UserDataSpout spout = new UserDataSpout();
builder.setSpout(SPOUT_ID, spout, 1);
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
index c391688..d100b84 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -129,4 +129,9 @@ public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<E
return new Fields("data");
}
}
+
+ @Override
+ protected Class<EsLookupBolt> getBoltClass() {
+ return EsLookupBolt.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
index 70ea140..175b2a4 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -117,4 +117,9 @@ public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
assertThat(declaredFields.getValue(), is(fields));
}
+
+ @Override
+ protected Class<EsLookupBolt> getBoltClass() {
+ return EsLookupBolt.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index d68cc89..c5ed91f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -52,4 +52,9 @@ public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercola
verify(outputCollector).ack(tuple);
verify(outputCollector).emit(new Values(source, any(PercolateResponse.Match.class)));
}
+
+ @Override
+ protected Class<EsPercolateBolt> getBoltClass() {
+ return EsPercolateBolt.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
deleted file mode 100644
index 411f187..0000000
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/TransportAddressesTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.elasticsearch.bolt;
-
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.junit.Test;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-
-public class TransportAddressesTest {
-
- @Test
- public void readsMultipleHosts() throws Exception {
- String[] hosts = new String[] {"h1:1000", "h2:10003"};
- TransportAddresses addresses = new TransportAddresses(hosts);
- assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
- new InetSocketTransportAddress("h2", 10003)));
- }
-
- @Test
- public void stripsSpaces() throws Exception {
- String[] hosts = new String[] {"h1:1000", " h2:10003 "};
- TransportAddresses addresses = new TransportAddresses(hosts);
- assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
- new InetSocketTransportAddress("h2", 10003)));
- }
-
- @Test
- public void readsOneHost() throws Exception {
- String[] hosts = new String[] {"h1:1000"};
- TransportAddresses addresses = new TransportAddresses(hosts);
- assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000)));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnNullHosts() throws Exception {
- new TransportAddresses(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnEmptyArray() throws Exception {
- new TransportAddresses(new String[] {}).iterator();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnInvalidHostAndPortPair() throws Exception {
- new TransportAddresses(new String[] {"h1:1000", "h2"}).iterator();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnInvalidPortValue() throws Exception {
- new TransportAddresses(new String[] {"h1:-1000"}).iterator();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnPortNotANumber() throws Exception {
- new TransportAddresses(new String[] {"h1:dummy"}).iterator();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void throwsOnInvalidHostAndPortFormat() throws Exception {
- new TransportAddresses(new String[] {"h1:dummy:231"}).iterator();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
new file mode 100644
index 0000000..de28940
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
@@ -0,0 +1,54 @@
+package org.apache.storm.elasticsearch.common;
+
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.testing.NullPointerTester;
+
+import org.elasticsearch.common.settings.Settings;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class EsConfigTest {
+
+ private String clusterName = "name";
+ private String[] nodes = new String[] {"localhost:9300"};
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nodesCannotBeEmpty() throws Exception {
+ new EsConfig(clusterName, new String[] {});
+ }
+
+ @Test
+ public void settingsContainClusterName() throws Exception {
+ EsConfig esConfig = new EsConfig(clusterName, nodes);
+ assertThat(esConfig.toBasicSettings().get("cluster.name"), is(clusterName));
+ }
+
+ @Test
+ public void usesAdditionalConfiguration() throws Exception {
+ Map<String, String> additionalSettings = additionalSettings();
+ EsConfig esConfig = new EsConfig(clusterName, nodes, additionalSettings);
+ Settings settings = esConfig.toBasicSettings();
+ assertSettingsContainAllAdditionalValues(settings, additionalSettings);
+ }
+
+ private Map<String, String> additionalSettings() {
+ return ImmutableMap.of("client.transport.sniff", "true", UUID.randomUUID().toString(),
+ UUID.randomUUID().toString());
+ }
+
+ private void assertSettingsContainAllAdditionalValues(Settings settings, Map<String, String> additionalSettings) {
+ for (String key : additionalSettings.keySet()) {
+ assertThat(settings.get(key), is(additionalSettings.get(key)));
+ }
+ }
+
+ @Test
+ public void constructorThrowsOnNull() throws Exception {
+ new NullPointerTester().testAllPublicConstructors(EsConfig.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
new file mode 100644
index 0000000..e2666ee
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
@@ -0,0 +1,5 @@
+package org.apache.storm.elasticsearch.common;
+
+public final class SniffSettings {
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
new file mode 100644
index 0000000..f2e4936
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/TransportAddressesTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+public class TransportAddressesTest {
+
+ @Test
+ public void readsMultipleHosts() throws Exception {
+ String[] hosts = new String[] {"h1:1000", "h2:10003"};
+ TransportAddresses addresses = new TransportAddresses(hosts);
+ assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+ new InetSocketTransportAddress("h2", 10003)));
+ }
+
+ @Test
+ public void stripsSpaces() throws Exception {
+ String[] hosts = new String[] {"h1:1000", " h2:10003 "};
+ TransportAddresses addresses = new TransportAddresses(hosts);
+ assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000),
+ new InetSocketTransportAddress("h2", 10003)));
+ }
+
+ @Test
+ public void readsOneHost() throws Exception {
+ String[] hosts = new String[] {"h1:1000"};
+ TransportAddresses addresses = new TransportAddresses(hosts);
+ assertThat(addresses, containsInAnyOrder(new InetSocketTransportAddress("h1", 1000)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnNullHosts() throws Exception {
+ new TransportAddresses(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnEmptyArray() throws Exception {
+ new TransportAddresses(new String[] {}).iterator();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnInvalidHostAndPortPair() throws Exception {
+ new TransportAddresses(new String[] {"h1:1000", "h2"}).iterator();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnInvalidPortValue() throws Exception {
+ new TransportAddresses(new String[] {"h1:-1000"}).iterator();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnPortNotANumber() throws Exception {
+ new TransportAddresses(new String[] {"h1:dummy"}).iterator();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void throwsOnInvalidHostAndPortFormat() throws Exception {
+ new TransportAddresses(new String[] {"h1:dummy:231"}).iterator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a54fdb2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 2c951f8..1ae1df7 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -45,9 +45,7 @@ public class TridentEsTopology {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", spout);
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
Fields esFields = new Fields("index", "type", "source");
StateFactory factory = new EsStateFactory(esConfig);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
[08/10] storm git commit: Remove guava leftovers
Posted by ka...@apache.org.
Remove guava leftovers
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fff82fd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fff82fd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fff82fd8
Branch: refs/heads/master
Commit: fff82fd8d4e72993af61c2ca6725dcd4a9af2d1f
Parents: 4f53fc4
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 17:31:44 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 17:31:44 2015 +0200
----------------------------------------------------------------------
.../org/apache/storm/elasticsearch/common/EsConfig.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fff82fd8/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 6e9b128..6bbd81f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -19,10 +19,9 @@ package org.apache.storm.elasticsearch.common;
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@@ -44,7 +43,7 @@ public class EsConfig implements Serializable {
* @param clusterName Elasticsearch cluster name
* @param nodes Elasticsearch addresses in host:port pattern string array
* @throws IllegalArgumentException if nodes are empty
- * @throws NullPointerException on any of the fields being null
+ * @throws NullPointerException on any of the fields being null
*/
public EsConfig(String clusterName, String[] nodes) {
this(clusterName, nodes, Collections.<String, String>emptyMap());
@@ -57,7 +56,7 @@ public class EsConfig implements Serializable {
* @param nodes Elasticsearch addresses in host:port pattern string array
* @param additionalConfiguration Additional Elasticsearch configuration
* @throws IllegalArgumentException if nodes are empty
- * @throws NullPointerException on any of the fields being null
+ * @throws NullPointerException on any of the fields being null
*/
public EsConfig(String clusterName, String[] nodes, Map<String, String> additionalConfiguration) {
checkNotNull(clusterName);
@@ -67,7 +66,7 @@ public class EsConfig implements Serializable {
checkArgument(nodes.length != 0, "Nodes cannot be empty");
this.clusterName = clusterName;
this.nodes = nodes;
- this.additionalConfiguration = ImmutableMap.copyOf(additionalConfiguration);
+ this.additionalConfiguration = new HashMap<>(additionalConfiguration);
}
TransportAddresses getTransportAddresses() {
[03/10] storm git commit: Switch to ES shaded Preconditions instead
of guava
Posted by ka...@apache.org.
Switch to ES shaded Preconditions instead of guava
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/677d3c5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/677d3c5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/677d3c5d
Branch: refs/heads/master
Commit: 677d3c5d601ccd954982d8753fa0a67a21ca3694
Parents: ae0f814
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:11:28 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:11:28 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java | 2 +-
.../java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java | 4 +++-
.../java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java | 2 +-
.../org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java | 4 +++-
.../java/org/apache/storm/elasticsearch/common/EsConfig.java | 4 ++--
.../java/org/apache/storm/elasticsearch/trident/EsState.java | 1 -
.../org/apache/storm/elasticsearch/trident/EsStateFactory.java | 3 +--
7 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
index fa7356a..784a57f 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java
@@ -32,7 +32,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
public abstract class AbstractEsBolt extends BaseRichBolt {
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 36b9d28..1c5983e 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -26,6 +26,8 @@ import org.apache.storm.elasticsearch.common.EsTupleMapper;
import java.util.Map;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
/**
* Basic bolt for storing tuple to ES document.
*/
@@ -39,7 +41,7 @@ public class EsIndexBolt extends AbstractEsBolt {
*/
public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
- this.tupleMapper = tupleMapper;
+ this.tupleMapper = checkNotNull(tupleMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
index bd7178b..1676a79 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -29,7 +29,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
/**
* @since 0.11
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 27e5e00..a361464 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -30,6 +30,8 @@ import org.elasticsearch.action.percolate.PercolateSourceBuilder;
import java.util.Map;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
/**
* Basic bolt for retrieve matched percolate queries.
*/
@@ -44,7 +46,7 @@ public class EsPercolateBolt extends AbstractEsBolt {
*/
public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
- this.tupleMapper = tupleMapper;
+ this.tupleMapper = checkNotNull(tupleMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
index 5724519..6e9b128 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java
@@ -26,8 +26,8 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static org.elasticsearch.common.base.Preconditions.checkArgument;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
/**
* @since 0.11
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 1eb0d04..c066553 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -93,7 +93,6 @@ class EsState implements State {
*
* @param tuples list of tuples for storing to ES.
* Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
- * @param collector
*/
public void updateState(List<TridentTuple> tuples) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
http://git-wip-us.apache.org/repos/asf/storm/blob/677d3c5d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index e5733e7..6904298 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -32,8 +32,7 @@ import java.util.Map;
*/
class EsStateFactory implements StateFactory {
private final EsConfig esConfig;
- private EsConfig esConfig;
- private EsTupleMapper tupleMapper;
+ private final EsTupleMapper tupleMapper;
/**
* EsStateFactory constructor
[09/10] storm git commit: Merge branch 'master' of
https://github.com/alexpanov/storm into STORM-1013
Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/alexpanov/storm into STORM-1013
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25aecf17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25aecf17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25aecf17
Branch: refs/heads/master
Commit: 25aecf17002b04c43ac258f3ef027f67a3de60df
Parents: bf5c35b fff82fd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 1 22:22:45 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 1 22:22:45 2015 +0900
----------------------------------------------------------------------
external/storm-elasticsearch/README.md | 37 ++++-----
external/storm-elasticsearch/pom.xml | 6 ++
.../elasticsearch/bolt/AbstractEsBolt.java | 10 ++-
.../elasticsearch/bolt/ElasticSearchClient.java | 57 --------------
.../storm/elasticsearch/bolt/EsIndexBolt.java | 4 +-
.../storm/elasticsearch/bolt/EsLookupBolt.java | 10 +++
.../elasticsearch/bolt/EsPercolateBolt.java | 4 +-
.../elasticsearch/bolt/TransportAddresses.java | 72 -----------------
.../storm/elasticsearch/common/EsConfig.java | 64 +++++++++++-----
.../common/StormElasticSearchClient.java | 48 ++++++++++++
.../common/TransportAddresses.java | 72 +++++++++++++++++
.../storm/elasticsearch/trident/EsState.java | 32 ++------
.../elasticsearch/trident/EsStateFactory.java | 20 ++---
.../storm/elasticsearch/trident/EsUpdater.java | 2 +-
.../elasticsearch/bolt/AbstractEsBoltTest.java | 15 +++-
.../elasticsearch/bolt/EsIndexBoltTest.java | 5 ++
.../elasticsearch/bolt/EsIndexTopology.java | 4 +-
.../bolt/EsLookupBoltIntegrationTest.java | 5 ++
.../elasticsearch/bolt/EsLookupBoltTest.java | 5 ++
.../elasticsearch/bolt/EsPercolateBoltTest.java | 5 ++
.../bolt/TransportAddressesTest.java | 81 --------------------
.../elasticsearch/common/EsConfigTest.java | 71 +++++++++++++++++
.../common/TransportAddressesTest.java | 81 ++++++++++++++++++++
.../trident/EsStateFactoryTest.java | 32 ++++++++
.../trident/TridentEsTopology.java | 6 +-
25 files changed, 450 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
[06/10] storm git commit: Apply suggested changes in the pull
request: - Add Apache License to EsConfigTest; - Remove unused class;
- Change README
Posted by ka...@apache.org.
Apply suggested changes in the pull request:
- Add Apache License to EsConfigTest;
- Remove unused class;
- Change README
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1469c647
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1469c647
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1469c647
Branch: refs/heads/master
Commit: 1469c647799c2d9d644c40e078fc0d533f568699
Parents: 189ba57
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 10:55:24 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 10:55:24 2015 +0200
----------------------------------------------------------------------
external/storm-elasticsearch/README.md | 2 +-
.../storm/elasticsearch/common/EsConfigTest.java | 17 +++++++++++++++++
.../storm/elasticsearch/common/SniffSettings.java | 5 -----
3 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 06a350c..2c52531 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -84,7 +84,7 @@ EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, ad
|--- |--- |---
|clusterName | Elasticsearch cluster name | String (required) |
|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
-|additionalParameters | Additional Elasticsearch configuration parameters | Map<String, String> (optional) |
+|additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map<String, String> (optional) |
## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
index de28940..d50337f 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.elasticsearch.common;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/1469c647/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
deleted file mode 100644
index e2666ee..0000000
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/SniffSettings.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.storm.elasticsearch.common;
-
-public final class SniffSettings {
-
-}
[07/10] storm git commit: Add Apache Licence to EsStateFactoryTest
Posted by ka...@apache.org.
Add Apache Licence to EsStateFactoryTest
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f53fc4c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f53fc4c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f53fc4c
Branch: refs/heads/master
Commit: 4f53fc4c7171c832b53ce9c6606d0cc7da208037
Parents: 1469c64
Author: Alex Panov <al...@teradata.com>
Authored: Mon Aug 31 12:13:47 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Mon Aug 31 12:13:47 2015 +0200
----------------------------------------------------------------------
.../elasticsearch/trident/EsStateFactoryTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4f53fc4c/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
index ce9eaf3..e2eb13e 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.elasticsearch.trident;
import com.google.common.testing.NullPointerTester;
[05/10] storm git commit: Update storm-elasticsearch documentation
Posted by ka...@apache.org.
Update storm-elasticsearch documentation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/189ba57e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/189ba57e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/189ba57e
Branch: refs/heads/master
Commit: 189ba57eb11854c1cd3db7b54ad8dd341c534b58
Parents: 499f891
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:22:07 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:22:07 2015 +0200
----------------------------------------------------------------------
external/storm-elasticsearch/README.md | 37 +++++++++++++++--------------
1 file changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/189ba57e/external/storm-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index a4792c7..06a350c 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -11,9 +11,7 @@ Users should make sure that ```EsTupleMapper``` can extract "source", "index", "
"source" is a document in JSON format string that will be indexed in Elasticsearch.
```java
-EsConfig esConfig = new EsConfig();
-esConfig.setClusterName(clusterName);
-esConfig.setNodes(new String[]{"localhost:9300"});
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
```
@@ -26,9 +24,7 @@ User should make sure ```EsTupleMapper``` can extract "source", "index", "type"
"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
```java
-EsConfig esConfig = new EsConfig();
-esConfig.setClusterName(clusterName);
-esConfig.setNodes(new String[]{"localhost:9300"});
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
```
@@ -40,14 +36,12 @@ for each Percolate.Match in PercolateResponse.
Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
-```code
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
- EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
- StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
- TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```
## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
@@ -72,11 +66,17 @@ EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
Provided components (Bolt, State) takes in EsConfig as a constructor arg.
- ```java
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
- ```
+```java
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
+```
+
+or
+
+```java
+Map<String, String> additionalParameters = new HashMap<>();
+additionalParameters.put("client.transport.sniff", "true");
+EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
+```
### EsConfig params
@@ -84,6 +84,7 @@ Provided components (Bolt, State) takes in EsConfig as a constructor arg.
|--- |--- |---
|clusterName | Elasticsearch cluster name | String (required) |
|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+|additionalParameters | Additional Elasticsearch configuration parameters | Map<String, String> (optional) |
## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
[04/10] storm git commit: Make EsStateFactory public, add tests
Posted by ka...@apache.org.
Make EsStateFactory public, add tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/499f8911
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/499f8911
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/499f8911
Branch: refs/heads/master
Commit: 499f891153c575b46f6b69a233f8e9e99e2183e4
Parents: 677d3c5
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 12:14:10 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 12:14:10 2015 +0200
----------------------------------------------------------------------
.../storm/elasticsearch/trident/EsStateFactory.java | 10 ++++++----
.../elasticsearch/trident/EsStateFactoryTest.java | 15 +++++++++++++++
2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/499f8911/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 6904298..e85cdc2 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -26,11 +26,13 @@ import storm.trident.state.StateFactory;
import java.util.Map;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
/**
* StateFactory for providing EsState.
* @since 0.11
*/
-class EsStateFactory implements StateFactory {
+public class EsStateFactory implements StateFactory {
private final EsConfig esConfig;
private final EsTupleMapper tupleMapper;
@@ -39,9 +41,9 @@ class EsStateFactory implements StateFactory {
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
- this.esConfig = esConfig;
- this.tupleMapper = tupleMapper;
+ public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
+ this.esConfig = checkNotNull(esConfig);
+ this.tupleMapper = checkNotNull(tupleMapper);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/499f8911/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
new file mode 100644
index 0000000..ce9eaf3
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/EsStateFactoryTest.java
@@ -0,0 +1,15 @@
+package org.apache.storm.elasticsearch.trident;
+
+import com.google.common.testing.NullPointerTester;
+
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.junit.Test;
+
+public class EsStateFactoryTest {
+
+ @Test
+ public void constructorThrowsOnNull() throws Exception {
+ new NullPointerTester().setDefault(EsConfig.class, new EsConfig("cluster", new String[] {"localhost:9300"}))
+ .testAllPublicConstructors(EsStateFactory.class);
+ }
+}
[10/10] storm git commit: add STORM-1013 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1013 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43687812
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43687812
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43687812
Branch: refs/heads/master
Commit: 436878128310afda4519e625c5db0339b889b811
Parents: 25aecf1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Sep 1 22:31:30 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Sep 1 22:31:30 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/43687812/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a917e3a..dce6010 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1013: [storm-elasticsearch] Expose TransportClient configuration Map to EsConfig
* STORM-1012: Shading jackson.
* STORM-974: [storm-elasticsearch] Introduces Tuple -> ES document mapper to get rid of constant field mapping (source, index, type)
* STORM-978: [storm-elasticsearch] Introduces Lookup(or Query)Bolt which emits matched documents from ES
[02/10] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by ka...@apache.org.
Merge remote-tracking branch 'upstream/master'
# Conflicts:
# external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
# external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
# external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
# external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
# external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae0f814d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae0f814d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae0f814d
Branch: refs/heads/master
Commit: ae0f814d589f718c4f989adffccea9b24bbfd3cf
Parents: 6a54fdb 07e0ff2
Author: Alex Panov <al...@teradata.com>
Authored: Sun Aug 30 11:48:37 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Sun Aug 30 11:48:37 2015 +0200
----------------------------------------------------------------------
.gitignore | 2 +-
CHANGELOG.md | 11 +-
README.markdown | 4 +
STORM-UI-REST-API.md | 41 +-
conf/defaults.yaml | 7 +-
.../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes
.../images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes
docs/documentation/nimbus-ha-design.md | 217 +++++
external/storm-elasticsearch/README.md | 48 +-
.../storm/elasticsearch/bolt/EsIndexBolt.java | 23 +-
.../elasticsearch/bolt/EsPercolateBolt.java | 24 +-
.../common/DefaultEsTupleMapper.java | 42 +
.../elasticsearch/common/EsTupleMapper.java | 55 ++
.../storm/elasticsearch/trident/EsState.java | 22 +-
.../elasticsearch/trident/EsStateFactory.java | 10 +-
.../storm/elasticsearch/trident/EsUpdater.java | 4 +
.../elasticsearch/bolt/AbstractEsBoltTest.java | 5 +-
.../elasticsearch/bolt/EsIndexBoltTest.java | 7 +-
.../elasticsearch/bolt/EsIndexTopology.java | 4 +-
.../elasticsearch/bolt/EsPercolateBoltTest.java | 4 +-
.../storm/elasticsearch/common/EsTestUtil.java | 5 +
.../trident/TridentEsTopology.java | 4 +-
external/storm-hbase/pom.xml | 2 +-
external/storm-hdfs/pom.xml | 4 +-
.../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
external/storm-kafka/README.md | 3 +
.../src/jvm/storm/kafka/KafkaSpout.java | 6 +-
.../src/jvm/storm/kafka/PartitionManager.java | 10 +-
.../src/jvm/storm/kafka/SpoutConfig.java | 3 +
external/storm-solr/README.md | 201 +++++
external/storm-solr/pom.xml | 98 +++
.../apache/storm/solr/bolt/SolrUpdateBolt.java | 136 ++++
.../storm/solr/config/CountBasedCommit.java | 59 ++
.../storm/solr/config/SolrCommitStrategy.java | 30 +
.../apache/storm/solr/config/SolrConfig.java | 42 +
.../storm/solr/mapper/SolrFieldsMapper.java | 182 +++++
.../storm/solr/mapper/SolrJsonMapper.java | 160 ++++
.../apache/storm/solr/mapper/SolrMapper.java | 32 +
.../storm/solr/mapper/SolrMapperException.java | 24 +
.../org/apache/storm/solr/schema/CopyField.java | 50 ++
.../org/apache/storm/solr/schema/Field.java | 50 ++
.../org/apache/storm/solr/schema/FieldType.java | 63 ++
.../org/apache/storm/solr/schema/Schema.java | 116 +++
.../storm/solr/schema/SolrFieldTypeFinder.java | 182 +++++
.../schema/builder/RestJsonSchemaBuilder.java | 69 ++
.../solr/schema/builder/SchemaBuilder.java | 27 +
.../apache/storm/solr/trident/SolrState.java | 67 ++
.../storm/solr/trident/SolrStateFactory.java | 44 +
.../apache/storm/solr/trident/SolrUpdater.java | 33 +
.../storm/solr/spout/SolrFieldsSpout.java | 76 ++
.../apache/storm/solr/spout/SolrJsonSpout.java | 120 +++
.../storm/solr/topology/SolrFieldsTopology.java | 56 ++
.../storm/solr/topology/SolrJsonTopology.java | 48 ++
.../storm/solr/topology/SolrTopology.java | 82 ++
.../solr/trident/SolrFieldsTridentTopology.java | 45 ++
.../solr/trident/SolrJsonTridentTopology.java | 45 ++
.../org/apache/storm/solr/util/TestUtil.java | 30 +
pom.xml | 19 +-
storm-core/pom.xml | 79 +-
storm-core/src/clj/backtype/storm/cluster.clj | 94 ++-
.../backtype/storm/command/shell_submission.clj | 9 +-
storm-core/src/clj/backtype/storm/config.clj | 15 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 252 ++++--
.../clj/backtype/storm/daemon/supervisor.clj | 53 +-
storm-core/src/clj/backtype/storm/thrift.clj | 23 +-
storm-core/src/clj/backtype/storm/timer.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 95 ++-
storm-core/src/clj/backtype/storm/zookeeper.clj | 108 ++-
storm-core/src/jvm/backtype/storm/Config.java | 47 +-
.../storm/codedistributor/ICodeDistributor.java | 56 ++
.../LocalFileSystemCodeDistributor.java | 106 +++
.../storm/generated/ClusterSummary.java | 292 ++++---
.../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
.../backtype/storm/generated/TopologyInfo.java | 221 +++--
.../storm/generated/TopologySummary.java | 107 ++-
.../backtype/storm/nimbus/ILeaderElector.java | 60 ++
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 93 +++
.../jvm/backtype/storm/utils/NimbusClient.java | 78 +-
.../jvm/backtype/storm/utils/RotatingMap.java | 12 +-
.../jvm/backtype/storm/utils/TimeCacheMap.java | 60 +-
.../src/jvm/backtype/storm/utils/Utils.java | 9 +
storm-core/src/py/storm/ttypes.py | 613 ++++++++------
storm-core/src/storm.thrift | 12 +-
storm-core/src/ui/public/index.html | 21 +
.../public/templates/index-page-template.html | 59 +-
.../templates/topology-page-template.html | 18 +-
storm-core/src/ui/public/topology.html | 35 +-
.../test/clj/backtype/storm/cluster_test.clj | 23 +-
.../storm/messaging/netty_unit_test.clj | 2 +-
.../test/clj/backtype/storm/nimbus_test.clj | 210 +++--
.../backtype/storm/security/auth/auth_test.clj | 4 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../test/clj/backtype/storm/supervisor_test.clj | 1 +
.../test/clj/backtype/storm/utils_test.clj | 12 -
storm-dist/binary/src/main/assembly/binary.xml | 14 +
95 files changed, 5721 insertions(+), 843 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e60c003,e3865e5..1eb0d04
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@@ -17,10 -17,10 +17,11 @@@
*/
package org.apache.storm.elasticsearch.trident;
-import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
+
+import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
@@@ -29,12 -33,15 +30,13 @@@ import org.slf4j.LoggerFactory
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
+ * Trident State for storing tuple to ES document.
+ * @since 0.11
*/
-public class EsState implements State {
+class EsState implements State {
private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
private static Client client;
private EsConfig esConfig;
@@@ -83,13 -105,20 +88,20 @@@
}
}
+ /**
+ * Store current state to ElasticSearch.
+ *
+ * @param tuples list of tuples for storing to ES.
+ * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+ * @param collector
+ */
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ public void updateState(List<TridentTuple> tuples) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
- String source = tuple.getStringByField("source");
- String index = tuple.getStringByField("index");
- String type = tuple.getStringByField("type");
- String id = tuple.getStringByField("id");
+ String source = tupleMapper.getSource(tuple);
+ String index = tupleMapper.getIndex(tuple);
+ String type = tupleMapper.getType(tuple);
+ String id = tupleMapper.getId(tuple);
bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 4f4dd39,9fdf7c6..e5733e7
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@@ -19,30 -19,37 +19,36 @@@ package org.apache.storm.elasticsearch.
import backtype.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
+ import org.apache.storm.elasticsearch.common.EsTupleMapper;
+
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import java.util.Map;
/**
+ * StateFactory for providing EsState.
+ * @since 0.11
*/
-public class EsStateFactory implements StateFactory {
+class EsStateFactory implements StateFactory {
+ private final EsConfig esConfig;
+ private EsConfig esConfig;
+ private EsTupleMapper tupleMapper;
- public EsStateFactory(){
-
- }
-
/**
* EsStateFactory constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- EsStateFactory(EsConfig esConfig){
- public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
++ EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
this.esConfig = esConfig;
+ this.tupleMapper = tupleMapper;
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- EsState esState = new EsState(esConfig);
+ EsState esState = new EsState(esConfig, tupleMapper);
- esState.prepare(conf, metrics, partitionIndex, numPartitions);
+ esState.prepare();
return esState;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 685ea8c,935c92e..1fb998b
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@@ -24,8 -24,12 +24,12 @@@ import storm.trident.tuple.TridentTuple
import java.util.List;
public class EsUpdater extends BaseStateUpdater<EsState> {
+ /**
+ * {@inheritDoc}
+ * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+ */
@Override
public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
+ state.updateState(tuples);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index 8a5fad2,17ccdc9..07b7c43
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@@ -17,8 -17,8 +17,10 @@@
*/
package org.apache.storm.elasticsearch.bolt;
+import com.google.common.testing.NullPointerTester;
+
+ import backtype.storm.Config;
+ import backtype.storm.task.OutputCollector;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.junit.After;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index bcba6d4,5860b3b..ba87616
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@@ -60,11 -58,7 +58,12 @@@ public class EsIndexBoltTest extends Ab
@Override
protected EsIndexBolt createBolt(EsConfig esConfig) {
- return new EsIndexBolt(esConfig);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ return new EsIndexBolt(esConfig, tupleMapper);
}
+
+ @Override
+ protected Class<EsIndexBolt> getBoltClass() {
+ return EsIndexBolt.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index 3c1e949,1f0118b..70e0738
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@@ -46,8 -47,11 +47,9 @@@ public class EsIndexTopology
TopologyBuilder builder = new TopologyBuilder();
UserDataSpout spout = new UserDataSpout();
builder.setSpout(SPOUT_ID, spout, 1);
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
- builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+ builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
EsTestUtil.waitForSeconds(5);
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ae0f814d/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
----------------------------------------------------------------------
diff --cc external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 1ae1df7,ee5e607..45d86f8
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@@ -45,9 -46,12 +46,10 @@@ public class TridentEsTopology
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", spout);
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(EsConstants.clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
- Fields esFields = new Fields("index", "type", "source", "id");
+ EsConfig esConfig = new EsConfig(EsConstants.clusterName, new String[]{"localhost:9300"});
+ Fields esFields = new Fields("index", "type", "source");
- StateFactory factory = new EsStateFactory(esConfig);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
EsTestUtil.startEsNode();