You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/28 00:12:48 UTC
[04/13] storm git commit: Introduce a unit test superclass for ES
bolts
Introduce a unit test superclass for ES bolts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3094baf2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3094baf2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3094baf2
Branch: refs/heads/master
Commit: 3094baf200beb513d98cea3a3ce94605e6760fe2
Parents: 3400c7d
Author: Alex Panov <al...@teradata.com>
Authored: Thu Aug 20 12:29:08 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Thu Aug 20 12:29:08 2015 +0200
----------------------------------------------------------------------
.../bolt/AbstractEsBoltIntegrationTest.java | 91 ++++++++++++++++++++
.../elasticsearch/bolt/AbstractEsBoltTest.java | 76 ++++++----------
.../elasticsearch/bolt/EsIndexBoltTest.java | 33 ++++---
.../elasticsearch/bolt/EsPercolateBoltTest.java | 30 +++----
4 files changed, 143 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
new file mode 100644
index 0000000..9bed459
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
+
+ protected static Node node;
+
+ @BeforeClass
+ public static void startElasticSearchNode() throws Exception {
+ node = NodeBuilder.nodeBuilder().data(true).settings(createSettings()).build();
+ node.start();
+ ensureEsGreen(node);
+ ClusterHealthResponse clusterHealth = node.client()
+ .admin()
+ .cluster()
+ .health(Requests.clusterHealthRequest()
+ .timeout(TimeValue.timeValueSeconds(30))
+ .waitForGreenStatus()
+ .waitForRelocatingShards(0))
+ .actionGet();
+ Thread.sleep(1000);
+ }
+
+ private static ImmutableSettings.Builder createSettings() {
+ return ImmutableSettings.builder()
+ .put(ClusterName.SETTING, "test-cluster")
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+ .put(EsExecutors.PROCESSORS, 1)
+ .put("http.enabled", false)
+ .put("index.percolator.map_unmapped_fields_as_string", true)
+ .put("index.store.type", "memory");
+ }
+
+ @AfterClass
+ public static void closeElasticSearchNode() throws Exception {
+ node.stop();
+ node.close();
+ FileUtils.deleteDirectory(new File("./data"));
+ }
+
+ private static void ensureEsGreen(Node node) {
+ ClusterHealthResponse chr = node.client()
+ .admin()
+ .cluster()
+ .health(Requests.clusterHealthRequest()
+ .timeout(TimeValue.timeValueSeconds(30))
+ .waitForGreenStatus()
+ .waitForEvents(Priority.LANGUID)
+ .waitForRelocatingShards(0))
+ .actionGet();
+ assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index ae6b321..8c5ea55 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -17,65 +17,43 @@
*/
package org.apache.storm.elasticsearch.bolt;
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.apache.storm.elasticsearch.common.EsConfig;
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
-import java.io.File;
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.class)
+abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
-public class AbstractEsBoltTest {
protected static Config config = new Config();
- protected static OutputCollector collector = mock(OutputCollector.class);
- protected static Node node;
- @BeforeClass
- public static void setup() throws Exception {
- node = NodeBuilder.nodeBuilder().data(true).settings(
- ImmutableSettings.builder()
- .put(ClusterName.SETTING, "test-cluster")
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
- .put(EsExecutors.PROCESSORS, 1)
- .put("http.enabled", false)
- .put("index.percolator.map_unmapped_fields_as_string", true)
- .put("index.store.type", "memory")
- ).build();
- node.start();
- ensureEsGreen(node);
- ClusterHealthResponse chr = node.client().admin().cluster()
- .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
- Thread.sleep(1000);
+ @Mock
+ protected OutputCollector outputCollector;
+
+ protected Bolt bolt;
+
+ @Before
+ public void createBolt() throws Exception {
+ bolt = createBolt(esConfig());
+ bolt.prepare(config, null, outputCollector);
}
- @AfterClass
- public static void cleanup() throws Exception {
- node.stop();
- node.close();
- FileUtils.deleteDirectory(new File("./data"));
+ protected abstract Bolt createBolt(EsConfig esConfig);
+
+ protected EsConfig esConfig() {
+ EsConfig esConfig = new EsConfig();
+ esConfig.setClusterName("test-cluster");
+ esConfig.setNodes(new String[] {"127.0.0.1:9300"});
+ return esConfig;
}
- private static void ensureEsGreen(Node node) {
- ClusterHealthResponse chr = node.client().admin().cluster()
- .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
- assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+ @After
+ public void cleanupBolt() throws Exception {
+ bolt.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index dd4b088..d2def5d 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,10 +20,7 @@ package org.apache.storm.elasticsearch.bolt;
import backtype.storm.tuple.Tuple;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountRequest;
-import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Assert;
import org.junit.Test;
@@ -32,29 +29,20 @@ import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.verify;
-public class EsIndexBoltTest extends AbstractEsBoltTest{
+public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
- private EsIndexBolt bolt;
@Test
public void testEsIndexBolt()
throws Exception {
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName("test-cluster");
- esConfig.setNodes(new String[]{"127.0.0.1:9300"});
-
- bolt = new EsIndexBolt(esConfig);
- bolt.prepare(config, null, collector);
-
- String source = "{\"user\":\"user1\"}";
String index = "index1";
String type = "type1";
- String id = "docId";
- Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, id);
+
+ Tuple tuple = createTestTuple(index, type);
bolt.execute(tuple);
- verify(collector).ack(tuple);
+ verify(outputCollector).ack(tuple);
node.client().admin().indices().prepareRefresh(index).execute().actionGet();
CountResponse resp = node.client().prepareCount(index)
@@ -62,7 +50,16 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
.execute().actionGet();
Assert.assertEquals(1, resp.getCount());
+ }
+
+ private Tuple createTestTuple(String index, String type) {
+ String source = "{\"user\":\"user1\"}";
+ String id = "docId";
+ return EsTestUtil.generateTestTuple(source, index, type, id);
+ }
- bolt.cleanup();
+ @Override
+ protected EsIndexBolt createBolt(EsConfig esConfig) {
+ return new EsIndexBolt(esConfig);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index fd4fa4f..d68cc89 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,35 +21,27 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.any;
-public class EsPercolateBoltTest extends AbstractEsBoltTest {
- private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
- private EsPercolateBolt bolt;
+public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
+
+ @Override
+ protected EsPercolateBolt createBolt(EsConfig esConfig) {
+ return new EsPercolateBolt(esConfig);
+ }
@Test
public void testEsPercolateBolt()
throws Exception {
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName("test-cluster");
- esConfig.setNodes(new String[]{"localhost:9300"});
- bolt = new EsPercolateBolt(esConfig);
- bolt.prepare(config, null, collector);
-
String source = "{\"user\":\"user1\"}";
String index = "index1";
String type = ".percolator";
- node.client().prepareIndex("index1",".percolator")
+ node.client().prepareIndex("index1", ".percolator")
.setId("1")
.setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
execute().actionGet();
@@ -57,9 +49,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
bolt.execute(tuple);
- verify(collector).ack(tuple);
- verify(collector).emit(new Values(source, any(PercolateResponse.Match.class)));
-
- bolt.cleanup();
+ verify(outputCollector).ack(tuple);
+ verify(outputCollector).emit(new Values(source, any(PercolateResponse.Match.class)));
}
-}
\ No newline at end of file
+}