You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/20 23:15:39 UTC

git commit: integration test harness very basic test for persist writer using harness tests for new processor capability

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-170 ddf359c50 -> cf4fd7724


integration test harness
very basic test for persist writer using harness
tests for new processor capability


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cf4fd772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cf4fd772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cf4fd772

Branch: refs/heads/STREAMS-170
Commit: cf4fd7724164db06f178c0ef2aade45797be897f
Parents: ddf359c
Author: sblackmon <sb...@apache.org>
Authored: Mon Oct 20 16:15:35 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Oct 20 16:15:35 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       | 47 ++++++++++++
 .../processor/DocumentToMetadataProcessor.java  | 29 +++----
 .../ElasticsearchConfiguration.json             |  3 +-
 .../test/TestDatumFromMetadataProcessor.java    | 81 ++++++++++++++++++++
 .../test/TestDocumentToMetadataProcessor.java   | 63 +++++++++++++++
 .../test/TestElasticsearchPersistWriter.java    | 70 +++++++++++++++++
 6 files changed, 274 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 95f3357..b75c38d 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -13,10 +13,34 @@
 
     <properties>
         <elasticsearch.version>1.1.0</elasticsearch.version>
+        <lucene.version>4.7.2</lucene.version>
     </properties>
 
     <dependencies>
+        <!-- Test includes -->
         <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-codecs</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+       <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
             <version>${project.version}</version>
@@ -55,6 +79,29 @@
                 <artifactId>elasticsearch</artifactId>
                 <version>${elasticsearch.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.elasticsearch</groupId>
+                <artifactId>elasticsearch</artifactId>
+                <type>test-jar</type>
+                <version>${elasticsearch.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.lucene</groupId>
+                <artifactId>lucene-test-framework</artifactId>
+                <version>${lucene.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.lucene</groupId>
+                <artifactId>lucene-codecs</artifactId>
+                <version>${lucene.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-all</artifactId>
+                <version>1.3</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index c4c654f..ed449fd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.elasticsearch.processor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,6 +33,8 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -49,32 +52,24 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
 
     public final static String STREAMS_ID = "DatumFromMetadataProcessor";
 
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchReaderConfiguration config;
-
     private ObjectMapper mapper;
 
-    public DocumentToMetadataProcessor() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
-    }
+    private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class);
 
-    public DocumentToMetadataProcessor(Config config) {
-        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
-    }
-
-    public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) {
-        this.config = config;
+    public DocumentToMetadataProcessor() {
     }
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
         List<StreamsDatum> result = Lists.newArrayList();
 
+        Object object = entry.getDocument();
         ObjectNode metadataObjectNode;
         try {
-            metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
-        } catch (IOException e) {
+            String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object);
+            metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class);
+        } catch (Throwable e) {
+            LOGGER.warn("Exception: %s", e.getMessage());
             return result;
         }
 
@@ -92,15 +87,13 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
 
     @Override
     public void prepare(Object configurationObject) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
         mapper = StreamsJacksonMapper.getInstance();
         mapper.registerModule(new JsonOrgModule());
-
     }
 
     @Override
     public void cleanUp() {
-        this.elasticsearchClientManager.getClient().close();
+        mapper = null;
     }
 
     public static Map<String, Object> asMap(JsonNode node) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
index 6524dcc..e2ed37c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
@@ -14,7 +14,8 @@
         },
         "port": {
             "type": "integer",
-            "description": "Elasticsearch Transport API port"
+            "description": "Elasticsearch Transport API port",
+            "default": 9300
         },
         "clusterName": {
             "type": "string",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
new file mode 100644
index 0000000..66434bc
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
@@ -0,0 +1,81 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestDatumFromMetadataProcessor extends ElasticsearchIntegrationTest {
+
+    private final String TEST_INDEX = "TestDatumFromMetadataProcessor".toLowerCase();
+
+    private ElasticsearchReaderConfiguration testConfiguration;
+
+    @Test
+    public void testSerializability() {
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Before
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchReaderConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        client().index(client().prepareIndex(TEST_INDEX, "activity", "id").setSource(testJsonString).request()).actionGet(5, TimeUnit.SECONDS);
+
+    }
+
+    @Test
+    public void testDatumFromMetadataProcessor() {
+
+        Map<String, Object> metadata = Maps.newHashMap();
+
+        metadata.put("index", TEST_INDEX);
+        metadata.put("type", "activity");
+        metadata.put("id", "id");
+
+        DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+        StreamsDatum testInput = new StreamsDatum(null);
+
+        testInput.setMetadata(metadata);
+
+        Assert.assertNull(testInput.document);
+
+        processor.prepare(null);
+
+        StreamsDatum testOutput = processor.process(testInput).get(0);
+
+        processor.cleanUp();
+
+        Assert.assertNotNull(testOutput.document);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
new file mode 100644
index 0000000..b2bfa84
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
@@ -0,0 +1,63 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestDocumentToMetadataProcessor {
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    @Before
+    public void prepareTest() {
+
+    }
+
+    @Test
+    public void testSerializability() {
+        DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+        DocumentToMetadataProcessor clone = (DocumentToMetadataProcessor) SerializationUtils.clone(processor);
+    }
+
+    @Test
+    public void testDocumentToMetadataProcessor() {
+
+        ObjectNode document = MAPPER.createObjectNode()
+                .put("a", "a")
+                .put("b", "b")
+                .put("c", 6);
+
+        DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+        StreamsDatum testInput = new StreamsDatum(document);
+
+        Assert.assertNotNull(testInput.document);
+        Assert.assertNotNull(testInput.metadata);
+        Assert.assertEquals(testInput.metadata.size(), 0);
+
+        processor.prepare(null);
+
+        StreamsDatum testOutput = processor.process(testInput).get(0);
+
+        processor.cleanUp();
+
+        Assert.assertNotNull(testOutput.metadata);
+        Assert.assertEquals(testInput.metadata.size(), 3);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
new file mode 100644
index 0000000..8452592
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
@@ -0,0 +1,70 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestElasticsearchPersistWriter extends ElasticsearchIntegrationTest {
+
+    private final String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
+
+    private ElasticsearchWriterConfiguration testConfiguration;
+
+    public void prepareTest() {
+
+        testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+
+    }
+
+   @Test
+    public void testPersistWriterString() {
+
+        ElasticsearchWriterConfiguration testConfiguration = new ElasticsearchWriterConfiguration();
+        testConfiguration.setHosts(Lists.newArrayList("localhost"));
+        testConfiguration.setClusterName(cluster().getClusterName());
+        testConfiguration.setBatchSize(1l);
+        testConfiguration.setIndex(TEST_INDEX);
+        testConfiguration.setType("string");
+        ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
+        testPersistWriter.prepare(null);
+
+        String testJsonString = "{\"dummy\":\"true\"}";
+
+        assert(!indexExists(TEST_INDEX));
+
+        testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+        testPersistWriter.cleanUp();
+
+        flushAndRefresh();
+
+        assert(indexExists(TEST_INDEX));
+
+        long count = client().count(client().prepareCount().request()).actionGet().getCount();
+
+        assert(count > 0);
+
+    }
+}