You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/21 21:45:38 UTC

[08/10] git commit: integration test harness very basic test for persist writer using harness tests for new processor capability

integration test harness
very basic test for persist writer using harness
tests for new processor capability


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cf4fd772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cf4fd772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cf4fd772

Branch: refs/heads/master
Commit: cf4fd7724164db06f178c0ef2aade45797be897f
Parents: ddf359c
Author: sblackmon <sb...@apache.org>
Authored: Mon Oct 20 16:15:35 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Oct 20 16:15:35 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       | 47 ++++++++++++
 .../processor/DocumentToMetadataProcessor.java  | 29 +++----
 .../ElasticsearchConfiguration.json             |  3 +-
 .../test/TestDatumFromMetadataProcessor.java    | 81 ++++++++++++++++++++
 .../test/TestDocumentToMetadataProcessor.java   | 63 +++++++++++++++
 .../test/TestElasticsearchPersistWriter.java    | 70 +++++++++++++++++
 6 files changed, 274 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 95f3357..b75c38d 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -13,10 +13,34 @@
 
     <properties>
         <elasticsearch.version>1.1.0</elasticsearch.version>
+        <lucene.version>4.7.2</lucene.version>
     </properties>
 
     <dependencies>
+        <!-- Test includes -->
         <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-codecs</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+       <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
             <version>${project.version}</version>
@@ -55,6 +79,29 @@
                 <artifactId>elasticsearch</artifactId>
                 <version>${elasticsearch.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.elasticsearch</groupId>
+                <artifactId>elasticsearch</artifactId>
+                <type>test-jar</type>
+                <version>${elasticsearch.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.lucene</groupId>
+                <artifactId>lucene-test-framework</artifactId>
+                <version>${lucene.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.lucene</groupId>
+                <artifactId>lucene-codecs</artifactId>
+                <version>${lucene.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-all</artifactId>
+                <version>1.3</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index c4c654f..ed449fd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.elasticsearch.processor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,6 +33,8 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -49,32 +52,24 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
 
     public final static String STREAMS_ID = "DatumFromMetadataProcessor";
 
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchReaderConfiguration config;
-
     private ObjectMapper mapper;
 
-    public DocumentToMetadataProcessor() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
-    }
+    private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class);
 
-    public DocumentToMetadataProcessor(Config config) {
-        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
-    }
-
-    public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) {
-        this.config = config;
+    public DocumentToMetadataProcessor() {
     }
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
         List<StreamsDatum> result = Lists.newArrayList();
 
+        Object object = entry.getDocument();
         ObjectNode metadataObjectNode;
         try {
-            metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
-        } catch (IOException e) {
+            String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object);
+            metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class);
+        } catch (Throwable e) {
+            LOGGER.warn("Exception: %s", e.getMessage());
             return result;
         }
 
@@ -92,15 +87,13 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
 
     @Override
     public void prepare(Object configurationObject) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
         mapper = StreamsJacksonMapper.getInstance();
         mapper.registerModule(new JsonOrgModule());
-
     }
 
     @Override
     public void cleanUp() {
-        this.elasticsearchClientManager.getClient().close();
+        mapper = null;
     }
 
     public static Map<String, Object> asMap(JsonNode node) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
index 6524dcc..e2ed37c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
@@ -14,7 +14,8 @@
         },
         "port": {
             "type": "integer",
-            "description": "Elasticsearch Transport API port"
+            "description": "Elasticsearch Transport API port",
+            "default": 9300
         },
         "clusterName": {
             "type": "string",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
new file mode 100644
index 0000000..66434bc
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
@@ -0,0 +1,81 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestDatumFromMetadataProcessor extends ElasticsearchIntegrationTest {
+
+    private final String TEST_INDEX = "TestDatumFromMetadataProcessor".toLowerCase();
+
+    private ElasticsearchReaderConfiguration testConfiguration;
+
+    @Test
+    public void testSerializability() {
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Before
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchReaderConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        client().index(client().prepareIndex(TEST_INDEX, "activity", "id").setSource(testJsonString).request()).actionGet(5, TimeUnit.SECONDS);
+
+    }
+
+    @Test
+    public void testDatumFromMetadataProcessor() {
+
+        Map<String, Object> metadata = Maps.newHashMap();
+
+        metadata.put("index", TEST_INDEX);
+        metadata.put("type", "activity");
+        metadata.put("id", "id");
+
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        StreamsDatum testInput = new StreamsDatum(null);
+
+        testInput.setMetadata(metadata);
+
+        Assert.assertNull(testInput.document);
+
+        processor.prepare(null);
+
+        StreamsDatum testOutput = processor.process(testInput).get(0);
+
+        processor.cleanUp();
+
+        Assert.assertNotNull(testOutput.document);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
new file mode 100644
index 0000000..b2bfa84
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
@@ -0,0 +1,63 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestDocumentToMetadataProcessor {
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    @Before
+    public void prepareTest() {
+
+    }
+
+    @Test
+    public void testSerializability() {
+        DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+        DocumentToMetadataProcessor clone = (DocumentToMetadataProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Test
+    public void testDocumentToMetadataProcessor() {
+
+        ObjectNode document = MAPPER.createObjectNode()
+                .put("a", "a")
+                .put("b", "b")
+                .put("c", 6);
+
+        DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+        StreamsDatum testInput = new StreamsDatum(document);
+
+        Assert.assertNotNull(testInput.document);
+        Assert.assertNotNull(testInput.metadata);
+        Assert.assertEquals(testInput.metadata.size(), 0);
+
+        processor.prepare(null);
+
+        StreamsDatum testOutput = processor.process(testInput).get(0);
+
+        processor.cleanUp();
+
+        Assert.assertNotNull(testOutput.metadata);
+        Assert.assertEquals(testInput.metadata.size(), 3);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
new file mode 100644
index 0000000..8452592
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
@@ -0,0 +1,70 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestElasticsearchPersistWriter extends ElasticsearchIntegrationTest {
+
+    private final String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
+
+    private ElasticsearchWriterConfiguration testConfiguration;
+
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+    }
+
+   @Test
+    public void testPersistWriterString() {
+
+        ElasticsearchWriterConfiguration testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+        testConfiguration.setBatchSize(1l);
+        testConfiguration.setIndex(TEST_INDEX);
+        testConfiguration.setType("string");
+        ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
+        testPersistWriter.prepare(null);
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        assert(!indexExists(TEST_INDEX));
+
+        testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+        testPersistWriter.cleanUp();
+
+        flushAndRefresh();
+
+        assert(indexExists(TEST_INDEX));
+
+        long count = client().count(client().prepareCount().request()).actionGet().getCount();
+
+        assert(count > 0);
+
+    }
+}