You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/28 00:12:49 UTC

[05/13] storm git commit: Implement EsLookupBolt

Implement EsLookupBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59999924
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59999924
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59999924

Branch: refs/heads/master
Commit: 599999246f4ce251d44c50a64423ccf8ea464b34
Parents: 3094baf
Author: Alex Panov <al...@teradata.com>
Authored: Thu Aug 20 12:30:17 2015 +0200
Committer: Alex Panov <al...@teradata.com>
Committed: Thu Aug 20 12:30:17 2015 +0200

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchGetRequest.java  |  10 ++
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |  70 ++++++++++
 .../bolt/EsLookupResultOutput.java              |  15 +++
 .../bolt/EsLookupBoltIntegrationTest.java       | 130 +++++++++++++++++++
 .../elasticsearch/bolt/EsLookupBoltTest.java    | 102 +++++++++++++++
 5 files changed, 327 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/59999924/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
new file mode 100644
index 0000000..442af3c
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchGetRequest.java
@@ -0,0 +1,10 @@
+package org.apache.storm.elasticsearch;
+
+import org.elasticsearch.action.get.GetRequest;
+
+import backtype.storm.tuple.Tuple;
+
+public interface ElasticsearchGetRequest {
+
+    GetRequest extractFrom(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/59999924/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
new file mode 100644
index 0000000..b6063d4
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.util.Collection;
+
+import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class EsLookupBolt extends AbstractEsBolt {
+
+    private final ElasticsearchGetRequest getRequest;
+    private final EsLookupResultOutput output;
+
+    public EsLookupBolt(EsConfig esConfig, ElasticsearchGetRequest getRequest, EsLookupResultOutput output) {
+        super(esConfig);
+        this.getRequest = getRequest;
+        this.output = output;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            Collection<Values> values = lookupValuesInEs(tuple);
+            tryEmitAndAck(values, tuple);
+        } catch (Exception e) {
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    private Collection<Values> lookupValuesInEs(Tuple tuple) {
+        GetRequest request = getRequest.extractFrom(tuple);
+        GetResponse response = client.get(request).actionGet();
+        return output.toValues(response);
+    }
+
+    private void tryEmitAndAck(Collection<Values> values, Tuple tuple) {
+        for (Values value : values) {
+            collector.emit(tuple, value);
+        }
+        collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(output.fields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/59999924/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupResultOutput.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupResultOutput.java
new file mode 100644
index 0000000..8696c6b
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupResultOutput.java
@@ -0,0 +1,15 @@
+package org.apache.storm.elasticsearch.bolt;
+
+import java.util.Collection;
+
+import org.elasticsearch.action.get.GetResponse;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public interface EsLookupResultOutput {
+
+    Collection<Values> toValues(GetResponse response);
+
+    Fields fields();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/59999924/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
new file mode 100644
index 0000000..2a544f9
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltIntegrationTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EsLookupBoltIntegrationTest extends AbstractEsBoltIntegrationTest<EsLookupBolt> {
+
+    private final String documentId = UUID.randomUUID().toString();
+    private final String indexName = "index";
+    private final String typeName = "type";
+    private final String source = "{\"user\":\"user1\"}";
+
+    private ElasticsearchGetRequest getRequest = new TestElasticsearchGetRequest();
+    private EsLookupResultOutput output = new TestEsLookupResultOutput();
+
+    @Captor
+    private ArgumentCaptor<Tuple> anchor;
+
+    @Captor
+    private ArgumentCaptor<Values> emmitedValues;
+
+    @Mock
+    private Tuple tuple;
+
+    @Override
+    protected EsLookupBolt createBolt(EsConfig esConfig) {
+        return new EsLookupBolt(esConfig, getRequest, output);
+    }
+
+    @Before
+    public void populateIndexWithTestData() throws Exception {
+        node.client().prepareIndex(indexName, typeName, documentId).setSource(source).execute().actionGet();
+    }
+
+    @Before
+    public void clearIndex() throws Exception {
+        node.client().delete(new DeleteRequest(indexName, typeName, documentId)).actionGet();
+    }
+
+    @Test
+    public void anchorsTheTuple() throws Exception {
+        bolt.execute(tuple);
+
+        verify(outputCollector).emit(anchor.capture(), emmitedValues.capture());
+        assertThat(anchor.getValue(), is(tuple));
+    }
+
+    @Test
+    public void emitsExpectedValues() throws Exception {
+        Values expectedValues = expectedValues();
+
+        bolt.execute(tuple);
+
+        verify(outputCollector).emit(anchor.capture(), emmitedValues.capture());
+        assertThat(emmitedValues.getValue(), is(expectedValues));
+    }
+
+    @Test
+    public void acksTuple() throws Exception {
+        bolt.execute(tuple);
+
+        verify(outputCollector).ack(anchor.capture());
+        assertThat(anchor.getValue(), is(tuple));
+    }
+
+    private Values expectedValues() {
+        return new Values(source);
+    }
+
+    private class TestElasticsearchGetRequest implements ElasticsearchGetRequest {
+
+        @Override
+        public GetRequest extractFrom(Tuple tuple) {
+            return node.client().prepareGet().setId(documentId).setIndex(indexName).setType(typeName).request();
+        }
+    }
+
+    private class TestEsLookupResultOutput implements EsLookupResultOutput {
+
+        @Override
+        public Collection<Values> toValues(GetResponse response) {
+            return Collections.singleton(expectedValues());
+        }
+
+        @Override
+        public Fields fields() {
+            return new Fields("data");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/59999924/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
new file mode 100644
index 0000000..602ed6d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsLookupBoltTest.java
@@ -0,0 +1,102 @@
+package org.apache.storm.elasticsearch.bolt;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.storm.elasticsearch.ElasticsearchGetRequest;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Client;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EsLookupBoltTest extends AbstractEsBoltTest<EsLookupBolt> {
+
+    @Mock
+    private EsConfig esConfig;
+
+    @Mock
+    private ElasticsearchGetRequest getRequest;
+
+    @Mock
+    private EsLookupResultOutput output;
+
+    @Mock
+    private Tuple tuple;
+
+    @Mock
+    private GetRequest request;
+
+    @Mock
+    private Client client;
+
+    private Client originalClient;
+
+    @Override
+    protected EsLookupBolt createBolt(EsConfig esConfig) {
+        originalClient = EsLookupBolt.getClient();
+        EsLookupBolt.replaceClient(this.client);
+        return new EsLookupBolt(esConfig, getRequest, output);
+    }
+
+    @After
+    public void replaceClientWithOriginal() throws Exception {
+        EsLookupBolt.replaceClient(originalClient);
+    }
+
+    @Before
+    public void configureBoltDependencies() throws Exception {
+        when(getRequest.extractFrom(tuple)).thenReturn(request);
+        when(output.toValues(any(GetResponse.class))).thenReturn(Collections.singleton(new Values("")));
+    }
+
+    @Test
+    public void failsTupleWhenClientThrows() throws Exception {
+        when(client.get(request)).thenThrow(ElasticsearchException.class);
+        bolt.execute(tuple);
+
+        verify(outputCollector).fail(tuple);
+    }
+
+    @Test
+    public void reportsExceptionWhenClientThrows() throws Exception {
+        ElasticsearchException elasticsearchException = new ElasticsearchException("dummy");
+        when(client.get(request)).thenThrow(elasticsearchException);
+        bolt.execute(tuple);
+
+        verify(outputCollector).reportError(elasticsearchException);
+    }
+
+    @Test
+    public void fieldsAreDeclaredThroughProvidedOutput() throws Exception {
+        Fields fields = new Fields(UUID.randomUUID().toString());
+        when(output.fields()).thenReturn(fields);
+        OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
+        bolt.declareOutputFields(declarer);
+
+        ArgumentCaptor<Fields> declaredFields = ArgumentCaptor.forClass(Fields.class);
+        verify(declarer).declare(declaredFields.capture());
+
+        assertThat(declaredFields.getValue(), is(fields));
+    }
+}