You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/28 00:12:48 UTC

[04/13] storm git commit: Introduce a unit test superclass for ES bolts

Introduce a unit test superclass for ES bolts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3094baf2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3094baf2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3094baf2

Branch: refs/heads/master
Commit: 3094baf200beb513d98cea3a3ce94605e6760fe2
Parents: 3400c7d
Author: Alex Panov <al...@teradata.com>
Authored: Thu Aug 20 12:29:08 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Thu Aug 20 12:29:08 2015 +0200

----------------------------------------------------------------------
 .../bolt/AbstractEsBoltIntegrationTest.java     | 91 ++++++++++++++++++++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  | 76 ++++++----------
 .../elasticsearch/bolt/EsIndexBoltTest.java     | 33 ++++---
 .../elasticsearch/bolt/EsPercolateBoltTest.java | 30 +++----
 4 files changed, 143 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
new file mode 100644
index 0000000..9bed459
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltIntegrationTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public abstract class AbstractEsBoltIntegrationTest<Bolt extends AbstractEsBolt> extends AbstractEsBoltTest<Bolt> {
+
+    protected static Node node;
+
+    @BeforeClass
+    public static void startElasticSearchNode() throws Exception {
+        node = NodeBuilder.nodeBuilder().data(true).settings(createSettings()).build();
+        node.start();
+        ensureEsGreen(node);
+        ClusterHealthResponse clusterHealth = node.client()
+                                                  .admin()
+                                                  .cluster()
+                                                  .health(Requests.clusterHealthRequest()
+                                                                  .timeout(TimeValue.timeValueSeconds(30))
+                                                                  .waitForGreenStatus()
+                                                                  .waitForRelocatingShards(0))
+                                                  .actionGet();
+        Thread.sleep(1000);
+    }
+
+    private static ImmutableSettings.Builder createSettings() {
+        return ImmutableSettings.builder()
+                                .put(ClusterName.SETTING, "test-cluster")
+                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+                                .put(EsExecutors.PROCESSORS, 1)
+                                .put("http.enabled", false)
+                                .put("index.percolator.map_unmapped_fields_as_string", true)
+                                .put("index.store.type", "memory");
+    }
+
+    @AfterClass
+    public static void closeElasticSearchNode() throws Exception {
+        node.stop();
+        node.close();
+        FileUtils.deleteDirectory(new File("./data"));
+    }
+
+    private static void ensureEsGreen(Node node) {
+        ClusterHealthResponse chr = node.client()
+                                        .admin()
+                                        .cluster()
+                                        .health(Requests.clusterHealthRequest()
+                                                        .timeout(TimeValue.timeValueSeconds(30))
+                                                        .waitForGreenStatus()
+                                                        .waitForEvents(Priority.LANGUID)
+                                                        .waitForRelocatingShards(0))
+                                        .actionGet();
+        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index ae6b321..8c5ea55 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -17,65 +17,43 @@
  */
 package org.apache.storm.elasticsearch.bolt;
 
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.apache.storm.elasticsearch.common.EsConfig;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
 
-import java.io.File;
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.class)
+abstract class AbstractEsBoltTest<Bolt extends AbstractEsBolt> {
 
-public class AbstractEsBoltTest {
     protected static Config config = new Config();
-    protected static OutputCollector collector = mock(OutputCollector.class);
-    protected static Node node;
 
-    @BeforeClass
-    public static void setup() throws Exception {
-        node = NodeBuilder.nodeBuilder().data(true).settings(
-                ImmutableSettings.builder()
-                        .put(ClusterName.SETTING, "test-cluster")
-                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
-                        .put(EsExecutors.PROCESSORS, 1)
-                        .put("http.enabled", false)
-                        .put("index.percolator.map_unmapped_fields_as_string", true)
-                        .put("index.store.type", "memory")
-        ).build();
-        node.start();
-        ensureEsGreen(node);
-        ClusterHealthResponse chr = node.client().admin().cluster()
-                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
-        Thread.sleep(1000);
+    @Mock
+    protected OutputCollector outputCollector;
+
+    protected Bolt bolt;
+
+    @Before
+    public void createBolt() throws Exception {
+        bolt = createBolt(esConfig());
+        bolt.prepare(config, null, outputCollector);
     }
 
-    @AfterClass
-    public static void cleanup() throws Exception {
-        node.stop();
-        node.close();
-        FileUtils.deleteDirectory(new File("./data"));
+    protected abstract Bolt createBolt(EsConfig esConfig);
+
+    protected EsConfig esConfig() {
+        EsConfig esConfig = new EsConfig();
+        esConfig.setClusterName("test-cluster");
+        esConfig.setNodes(new String[] {"127.0.0.1:9300"});
+        return esConfig;
     }
 
-    private static void ensureEsGreen(Node node) {
-        ClusterHealthResponse chr = node.client().admin().cluster()
-                .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
-        assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN));
+    @After
+    public void cleanupBolt() throws Exception {
+        bolt.cleanup();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index dd4b088..d2def5d 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,10 +20,7 @@ package org.apache.storm.elasticsearch.bolt;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountRequest;
-import org.elasticsearch.action.count.CountRequestBuilder;
 import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -32,29 +29,20 @@ import org.slf4j.LoggerFactory;
 
 import static org.mockito.Mockito.verify;
 
-public class EsIndexBoltTest extends AbstractEsBoltTest{
+public class EsIndexBoltTest extends AbstractEsBoltIntegrationTest<EsIndexBolt> {
     private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
-    private EsIndexBolt bolt;
 
     @Test
     public void testEsIndexBolt()
             throws Exception {
-        EsConfig esConfig = new EsConfig();
-        esConfig.setClusterName("test-cluster");
-        esConfig.setNodes(new String[]{"127.0.0.1:9300"});
-
-        bolt = new EsIndexBolt(esConfig);
-        bolt.prepare(config, null, collector);
-
-        String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = "type1";
-        String id = "docId";
-        Tuple tuple = EsTestUtil.generateTestTuple(source, index, type, id);
+
+        Tuple tuple = createTestTuple(index, type);
 
         bolt.execute(tuple);
 
-        verify(collector).ack(tuple);
+        verify(outputCollector).ack(tuple);
 
         node.client().admin().indices().prepareRefresh(index).execute().actionGet();
         CountResponse resp = node.client().prepareCount(index)
@@ -62,7 +50,16 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{
                 .execute().actionGet();
 
         Assert.assertEquals(1, resp.getCount());
+    }
+
+    private Tuple createTestTuple(String index, String type) {
+        String source = "{\"user\":\"user1\"}";
+        String id = "docId";
+        return EsTestUtil.generateTestTuple(source, index, type, id);
+    }
 
-        bolt.cleanup();
+    @Override
+    protected EsIndexBolt createBolt(EsConfig esConfig) {
+        return new EsIndexBolt(esConfig);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3094baf2/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index fd4fa4f..d68cc89 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,35 +21,27 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.elasticsearch.common.EsConfig;
 import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Matchers.any;
 
-public class EsPercolateBoltTest extends AbstractEsBoltTest {
-    private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
-    private EsPercolateBolt bolt;
+public class EsPercolateBoltTest extends AbstractEsBoltIntegrationTest<EsPercolateBolt> {
+
+    @Override
+    protected EsPercolateBolt createBolt(EsConfig esConfig) {
+        return  new EsPercolateBolt(esConfig);
+    }
 
     @Test
     public void testEsPercolateBolt()
             throws Exception {
-        EsConfig esConfig = new EsConfig();
-        esConfig.setClusterName("test-cluster");
-        esConfig.setNodes(new String[]{"localhost:9300"});
-        bolt = new EsPercolateBolt(esConfig);
-        bolt.prepare(config, null, collector);
-
         String source = "{\"user\":\"user1\"}";
         String index = "index1";
         String type = ".percolator";
 
-        node.client().prepareIndex("index1",".percolator")
+        node.client().prepareIndex("index1", ".percolator")
                 .setId("1")
                 .setSource("{\"query\":{\"match\":{\"user\":\"user1\"}}}").
                 execute().actionGet();
@@ -57,9 +49,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest {
 
         bolt.execute(tuple);
 
-        verify(collector).ack(tuple);
-        verify(collector).emit(new Values(source, any(PercolateResponse.Match.class)));
-
-        bolt.cleanup();
+        verify(outputCollector).ack(tuple);
+        verify(outputCollector).emit(new Values(source, any(PercolateResponse.Match.class)));
     }
-}
\ No newline at end of file
+}