You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/20 23:15:39 UTC
git commit: integration test harness very basic test for persist
writer using harness tests for new processor capability
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-170 ddf359c50 -> cf4fd7724
integration test harness
very basic test for persist writer using harness
tests for new processor capability
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cf4fd772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cf4fd772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cf4fd772
Branch: refs/heads/STREAMS-170
Commit: cf4fd7724164db06f178c0ef2aade45797be897f
Parents: ddf359c
Author: sblackmon <sb...@apache.org>
Authored: Mon Oct 20 16:15:35 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Oct 20 16:15:35 2014 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/pom.xml | 47 ++++++++++++
.../processor/DocumentToMetadataProcessor.java | 29 +++----
.../ElasticsearchConfiguration.json | 3 +-
.../test/TestDatumFromMetadataProcessor.java | 81 ++++++++++++++++++++
.../test/TestDocumentToMetadataProcessor.java | 63 +++++++++++++++
.../test/TestElasticsearchPersistWriter.java | 70 +++++++++++++++++
6 files changed, 274 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 95f3357..b75c38d 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -13,10 +13,34 @@
<properties>
<elasticsearch.version>1.1.0</elasticsearch.version>
+ <lucene.version>4.7.2</lucene.version>
</properties>
<dependencies>
+ <!-- Test includes -->
<dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-codecs</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
<version>${project.version}</version>
@@ -55,6 +79,29 @@
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <type>test-jar</type>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-codecs</artifactId>
+ <version>${lucene.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index c4c654f..ed449fd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -18,6 +18,7 @@
package org.apache.streams.elasticsearch.processor;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,6 +33,8 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -49,32 +52,24 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
public final static String STREAMS_ID = "DatumFromMetadataProcessor";
- private ElasticsearchClientManager elasticsearchClientManager;
- private ElasticsearchReaderConfiguration config;
-
private ObjectMapper mapper;
- public DocumentToMetadataProcessor() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class);
- public DocumentToMetadataProcessor(Config config) {
- this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
- }
-
- public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) {
- this.config = config;
+ public DocumentToMetadataProcessor() {
}
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
List<StreamsDatum> result = Lists.newArrayList();
+ Object object = entry.getDocument();
ObjectNode metadataObjectNode;
try {
- metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
- } catch (IOException e) {
+ String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object);
+ metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class);
+ } catch (Throwable e) {
+ LOGGER.warn("Exception: %s", e.getMessage());
return result;
}
@@ -92,15 +87,13 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab
@Override
public void prepare(Object configurationObject) {
- this.elasticsearchClientManager = new ElasticsearchClientManager(config);
mapper = StreamsJacksonMapper.getInstance();
mapper.registerModule(new JsonOrgModule());
-
}
@Override
public void cleanUp() {
- this.elasticsearchClientManager.getClient().close();
+ mapper = null;
}
public static Map<String, Object> asMap(JsonNode node) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
index 6524dcc..e2ed37c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json
@@ -14,7 +14,8 @@
},
"port": {
"type": "integer",
- "description": "Elasticsearch Transport API port"
+ "description": "Elasticsearch Transport API port",
+ "default": 9300
},
"clusterName": {
"type": "string",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
new file mode 100644
index 0000000..66434bc
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDatumFromMetadataProcessor.java
@@ -0,0 +1,81 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestDatumFromMetadataProcessor extends ElasticsearchIntegrationTest {
+
+ private final String TEST_INDEX = "TestDatumFromMetadataProcessor".toLowerCase();
+
+ private ElasticsearchReaderConfiguration testConfiguration;
+
+ @Test
+ public void testSerializability() {
+ DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+ DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor);
+ }
+
+ @Before
+ public void prepareTest() {
+
+ testConfiguration = new ElasticsearchReaderConfiguration();
+ testConfiguration.setHosts(Lists.newArrayList("localhost"));
+ testConfiguration.setClusterName(cluster().getClusterName());
+
+ String testJsonString = "{\"dummy\":\"true\"}";
+
+ client().index(client().prepareIndex(TEST_INDEX, "activity", "id").setSource(testJsonString).request()).actionGet(5, TimeUnit.SECONDS);
+
+ }
+
+ @Test
+ public void testDatumFromMetadataProcessor() {
+
+ Map<String, Object> metadata = Maps.newHashMap();
+
+ metadata.put("index", TEST_INDEX);
+ metadata.put("type", "activity");
+ metadata.put("id", "id");
+
+ DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration);
+
+ StreamsDatum testInput = new StreamsDatum(null);
+
+ testInput.setMetadata(metadata);
+
+ Assert.assertNull(testInput.document);
+
+ processor.prepare(null);
+
+ StreamsDatum testOutput = processor.process(testInput).get(0);
+
+ processor.cleanUp();
+
+ Assert.assertNotNull(testOutput.document);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
new file mode 100644
index 0000000..b2bfa84
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
@@ -0,0 +1,63 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class TestDocumentToMetadataProcessor {
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ @Before
+ public void prepareTest() {
+
+ }
+
+ @Test
+ public void testSerializability() {
+ DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+ DocumentToMetadataProcessor clone = (DocumentToMetadataProcessor) SerializationUtils.clone(processor);
+ }
+
+ @Test
+ public void testDocumentToMetadataProcessor() {
+
+ ObjectNode document = MAPPER.createObjectNode()
+ .put("a", "a")
+ .put("b", "b")
+ .put("c", 6);
+
+ DocumentToMetadataProcessor processor = new DocumentToMetadataProcessor();
+
+ StreamsDatum testInput = new StreamsDatum(document);
+
+ Assert.assertNotNull(testInput.document);
+ Assert.assertNotNull(testInput.metadata);
+ Assert.assertEquals(testInput.metadata.size(), 0);
+
+ processor.prepare(null);
+
+ StreamsDatum testOutput = processor.process(testInput).get(0);
+
+ processor.cleanUp();
+
+ Assert.assertNotNull(testOutput.metadata);
+ Assert.assertEquals(testInput.metadata.size(), 3);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cf4fd772/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
new file mode 100644
index 0000000..8452592
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestElasticsearchPersistWriter.java
@@ -0,0 +1,70 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
+public class TestElasticsearchPersistWriter extends ElasticsearchIntegrationTest {
+
+ private final String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
+
+ private ElasticsearchWriterConfiguration testConfiguration;
+
+ public void prepareTest() {
+
+ testConfiguration = new ElasticsearchWriterConfiguration();
+ testConfiguration.setHosts(Lists.newArrayList("localhost"));
+ testConfiguration.setClusterName(cluster().getClusterName());
+
+ }
+
+ @Test
+ public void testPersistWriterString() {
+
+ ElasticsearchWriterConfiguration testConfiguration = new ElasticsearchWriterConfiguration();
+ testConfiguration.setHosts(Lists.newArrayList("localhost"));
+ testConfiguration.setClusterName(cluster().getClusterName());
+ testConfiguration.setBatchSize(1l);
+ testConfiguration.setIndex(TEST_INDEX);
+ testConfiguration.setType("string");
+ ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration);
+ testPersistWriter.prepare(null);
+
+ String testJsonString = "{\"dummy\":\"true\"}";
+
+ assert(!indexExists(TEST_INDEX));
+
+ testPersistWriter.write(new StreamsDatum(testJsonString, "test"));
+
+ testPersistWriter.cleanUp();
+
+ flushAndRefresh();
+
+ assert(indexExists(TEST_INDEX));
+
+ long count = client().count(client().prepareCount().request()).actionGet().getCount();
+
+ assert(count > 0);
+
+ }
+}