You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/11 21:17:07 UTC
[2/7] incubator-streams-examples git commit: upgrade
elasticsearch-reindex to es 2.0 + docker
upgrade elasticsearch-reindex to es 2.0 + docker
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/1444d257
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/1444d257
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/1444d257
Branch: refs/heads/master
Commit: 1444d25741dbf5315bf38d9bc6767c8d45a191cc
Parents: f76a7e0
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 7 10:58:19 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Oct 9 16:40:49 2016 -0500
----------------------------------------------------------------------
local/elasticsearch-reindex/pom.xml | 124 +++++++++++---
.../src/site/markdown/index.md | 8 +-
.../test/ElasticsearchReindexChildIT.java | 121 ++++++++++++++
.../test/ElasticsearchReindexIT.java | 116 ++++++-------
.../test/ElasticsearchReindexParentChildIT.java | 162 -------------------
.../test/ElasticsearchReindexParentIT.java | 133 +++++++++++++++
.../example/elasticsearch/test/ReindexITs.java | 20 +++
.../resources/ActivityChildObjectParent.json | 18 ---
.../resources/ElasticsearchReindexChildIT.conf | 24 +++
.../test/resources/ElasticsearchReindexIT.conf | 24 +++
.../resources/ElasticsearchReindexParentIT.conf | 24 +++
.../src/test/resources/testReindex.json | 28 ----
.../src/test/resources/testReindexChild.json | 28 ----
.../src/test/resources/testReindexParent.json | 28 ----
14 files changed, 516 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index 325e564..d4c5830 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -33,8 +33,8 @@
<properties>
<docker.repo>apachestreams</docker.repo>
- <elasticsearch.version>1.1.0</elasticsearch.version>
- <lucene.version>4.7.2</lucene.version>
+ <elasticsearch.version>2.3.5</elasticsearch.version>
+ <lucene.version>5.5.0</lucene.version>
</properties>
<dependencies>
@@ -95,6 +95,13 @@
<version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-elasticsearch</artifactId>
+ <version>0.4-incubating-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
@@ -222,35 +229,43 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <version>2.4</version>
+ <configuration>
+ <includes>**/*.json,**/*.conf</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeTypes>test-jar</includeTypes>
+ </configuration>
<executions>
<execution>
- <id>resource-dependencies</id>
+ <id>test-resource-dependencies</id>
<phase>process-test-resources</phase>
<goals>
<goal>unpack-dependencies</goal>
</goals>
- <configuration>
- <includeArtifactIds>streams-pojo</includeArtifactIds>
- <includes>**/*.json</includes>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
- </configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.12.4</version>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
+ <configuration>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ <exclude>**/*IT.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*ITs.java</include>
+ </includes>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
@@ -259,4 +274,75 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>dockerITs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>skipITs</name>
+ <value>false</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker.plugin.version}</version>
+ <configuration combine.self="override">
+ <watchInterval>500</watchInterval>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <autoPull>on</autoPull>
+ <images>
+ <image>
+ <name>elasticsearch:2.3.5</name>
+ <alias>elasticsearch</alias>
+ <run>
+ <namingStrategy>none</namingStrategy>
+ <ports>
+ <port>${es.http.host}:${es.http.port}:9200</port>
+ <port>${es.tcp.host}:${es.tcp.port}:9300</port>
+ </ports>
+ <portPropertyFile>elasticsearch.properties</portPropertyFile>
+ <wait>
+ <log>elasticsearch startup</log>
+ <http>
+ <url>http://${es.http.host}:${es.http.port}</url>
+ <method>GET</method>
+ <status>200</status>
+ </http>
+ <time>20000</time>
+ <kill>1000</kill>
+ <shutdown>500</shutdown>
+ <!--<tcp>-->
+ <!--<host>${es.transport.host}</host>-->
+ <!--<ports>-->
+ <!--<port>${es.transport.port}</port>-->
+ <!--</ports>-->
+ <!--</tcp>-->
+ </wait>
+ <log>
+ <enabled>true</enabled>
+ <date>default</date>
+ <color>cyan</color>
+ </log>
+ </run>
+ <watch>
+ <mode>none</mode>
+ </watch>
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </profile>
+ </profiles>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/site/markdown/index.md b/local/elasticsearch-reindex/src/site/markdown/index.md
index d573137..6383281 100644
--- a/local/elasticsearch-reindex/src/site/markdown/index.md
+++ b/local/elasticsearch-reindex/src/site/markdown/index.md
@@ -28,11 +28,13 @@ Example Configuration:
Populate source and destination in configuration with cluster / index / type details.
-Build:
+Testing:
---------
- mvn clean package verify
-
+ mvn -PdockerITs docker:start
+ mvn clean install test verify -DskipITs=false
+ mvn -PdockerITs docker:stop
+
Run (Local):
------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
new file mode 100644
index 0000000..d033014
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
+import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying parent/child associated documents between two indexes on same cluster
+ */
+public class ElasticsearchReindexChildIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
+
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+
+ }
+
+ @Test
+ public void testReindex() throws Exception {
+
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+
+ reindex.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ assertEquals(count, (int)countResponse.getHits().getTotalHits());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
index ebbe00e..5854ac0 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
@@ -19,98 +19,102 @@
package org.apache.streams.example.elasticsearch.test;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.InputStream;
-import java.util.*;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
/**
* Test copying documents between two indexes on same cluster
*/
-@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
-public class ElasticsearchReindexIT extends ElasticsearchIntegrationTest {
+public class ElasticsearchReindexIT {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
@Before
public void prepareTest() throws Exception {
- testConfiguration = new ElasticsearchConfiguration();
- testConfiguration.setHosts(Lists.newArrayList("localhost"));
- testConfiguration.setClusterName(cluster().getClusterName());
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
- ElasticsearchWriterConfiguration setupWriterConfiguration = MAPPER.convertValue(testConfiguration, ElasticsearchWriterConfiguration.class);
- setupWriterConfiguration.setIndex("source");
- setupWriterConfiguration.setType("activity");
- setupWriterConfiguration.setBatchSize(5l);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- ElasticsearchPersistWriter setupWriter = new ElasticsearchPersistWriter(setupWriterConfiguration);
- setupWriter.prepare(null);
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- InputStream testActivityFolderStream = ElasticsearchReindexIT.class.getClassLoader()
- .getResourceAsStream("activities");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- for( String file : files) {
- LOGGER.info("File: " + file );
- InputStream testActivityFileStream = ElasticsearchReindexIT.class.getClassLoader()
- .getResourceAsStream("activities/" + file);
- Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
- StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
- setupWriter.write( datum );
- LOGGER.info("Wrote: " + activity.getVerb() );
- }
+ count = (int)countResponse.getHits().getTotalHits();
- setupWriter.cleanUp();
-
- flushAndRefresh();
+ assertNotEquals(count, 0);
}
@Test
public void testReindex() throws Exception {
- ElasticsearchReindexConfiguration reindexConfiguration = MAPPER.readValue(
- ElasticsearchReindexIT.class.getResourceAsStream("/testReindex.json"), ElasticsearchReindexConfiguration.class);
-
- reindexConfiguration.getDestination().setClusterName(cluster().getClusterName());
- reindexConfiguration.getSource().setClusterName(cluster().getClusterName());
-
- assert(indexExists("source"));
- long srcCount = client().count(client().prepareCount("source").request()).get().getCount();
- assert srcCount > 0;
-
- ElasticsearchReindex reindex = new ElasticsearchReindex(reindexConfiguration);
-
- Thread reindexThread = new Thread(reindex);
- reindexThread.start();
- reindexThread.join();
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- flushAndRefresh();
+ reindex.run();
- assert(indexExists("destination"));
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
- assert srcCount == destCount;
+ assertEquals(count, (int)countResponse.getHits().getTotalHits());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentChildIT.java
deleted file mode 100644
index 95a7684..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentChildIT.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Test copying parent/child associated documents between two indexes on same cluster
- */
-@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
-public class ElasticsearchReindexParentChildIT extends ElasticsearchIntegrationTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
-
- @Before
- public void prepareTest() throws Exception {
-
- testConfiguration = new ElasticsearchConfiguration();
- testConfiguration.setHosts(Lists.newArrayList("localhost"));
- testConfiguration.setClusterName(cluster().getClusterName());
-
- PutIndexTemplateRequestBuilder putTemplateRequestBuilder = client().admin().indices().preparePutTemplate("mappings");
- URL templateURL = ElasticsearchReindexParentChildIT.class.getResource("/ActivityChildObjectParent.json");
- ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
- String templateSource = MAPPER.writeValueAsString(template);
- putTemplateRequestBuilder.setSource(templateSource);
-
- client().admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
-
- Reflections reflections = new Reflections(new ConfigurationBuilder()
- .setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json"))
- .setScanners(new SubTypesScanner()));
- Set<Class<? extends ActivityObject>> objectTypes = reflections.getSubTypesOf(ActivityObject.class);
-
- ElasticsearchWriterConfiguration setupWriterConfiguration = MAPPER.convertValue(testConfiguration, ElasticsearchWriterConfiguration.class);
- setupWriterConfiguration.setIndex("source");
- setupWriterConfiguration.setBatchSize(5l);
-
- ElasticsearchPersistWriter setupWriter = new ElasticsearchPersistWriter(setupWriterConfiguration);
- setupWriter.prepare(null);
-
- for( Class objectType : objectTypes ) {
- Object object = objectType.newInstance();
- ActivityObject activityObject = MAPPER.convertValue(object, ActivityObject.class);
- StreamsDatum datum = new StreamsDatum(activityObject, activityObject.getObjectType());
- datum.getMetadata().put("type", "object");
- setupWriter.write( datum );
- }
-
- InputStream testActivityFolderStream = ElasticsearchReindexIT.class.getClassLoader()
- .getResourceAsStream("activities");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
- for( String file : files) {
- LOGGER.info("File: " + file );
- InputStream testActivityFileStream = ElasticsearchReindexIT.class.getClassLoader()
- .getResourceAsStream("activities/" + file);
- Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
- StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
- datum.getMetadata().put("parent", activity.getObject().getObjectType());
- datum.getMetadata().put("type", "activity");
- setupWriter.write( datum );
- LOGGER.info("Wrote: " + activity.getVerb() );
- }
-
- setupWriter.cleanUp();
-
- flushAndRefresh();
-
- }
-
- @Test
- public void testReindex() throws Exception {
-
- assert(indexExists("source"));
- long srcCount = client().count(client().prepareCount("source").request()).get().getCount();
- assert srcCount > 0;
-
- ElasticsearchReindexConfiguration reindexConfiguration = MAPPER.readValue(
- ElasticsearchReindexParentChildIT.class.getResourceAsStream("/testReindexParent.json"), ElasticsearchReindexConfiguration.class);
-
- reindexConfiguration.getDestination().setClusterName(cluster().getClusterName());
- reindexConfiguration.getSource().setClusterName(cluster().getClusterName());
-
- ElasticsearchReindex reindex = new ElasticsearchReindex(reindexConfiguration);
-
- Thread reindexThread = new Thread(reindex);
- reindexThread.start();
- reindexThread.join();
-
- flushAndRefresh();
-
- ElasticsearchReindexConfiguration reindexConfiguration2 = MAPPER.readValue(
- ElasticsearchReindexParentChildIT.class.getResourceAsStream("/testReindexChild.json"), ElasticsearchReindexConfiguration.class);
-
- reindexConfiguration2.getDestination().setClusterName(cluster().getClusterName());
- reindexConfiguration2.getSource().setClusterName(cluster().getClusterName());
-
- ElasticsearchReindex reindex2 = new ElasticsearchReindex(reindexConfiguration2);
-
- Thread reindexThread2 = new Thread(reindex2);
- reindexThread2.start();
- reindexThread2.join();
-
- flushAndRefresh();
-
- assert(indexExists("destination"));
-
- long destCount = client().count(client().prepareCount("destination").request()).get().getCount();
- assert srcCount == destCount;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
new file mode 100644
index 0000000..90924e7
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.elasticsearch.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
+import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying parent/child associated documents between two indexes on same cluster
+ */
+public class ElasticsearchReindexParentIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
+
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
+ URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+ ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+ String templateSource = MAPPER.writeValueAsString(template);
+ putTemplateRequestBuilder.setSource(templateSource);
+
+ testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+
+ assertNotEquals(count, 0);
+
+ }
+
+ @Test
+ public void testReindex() throws Exception {
+
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+
+ reindex.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ assertEquals(count, (int)countResponse.getHits().getTotalHits());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
new file mode 100644
index 0000000..9c46d31
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
@@ -0,0 +1,20 @@
+package org.apache.streams.example.elasticsearch.test;
+
+import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
+import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ ElasticsearchPersistWriterIT.class,
+ ElasticsearchParentChildWriterIT.class,
+ ElasticsearchReindexIT.class,
+ ElasticsearchReindexParentIT.class,
+ ElasticsearchReindexChildIT.class
+})
+
+public class ReindexITs {
+ // the class remains empty,
+ // used only as a holder for the above annotations
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/ActivityChildObjectParent.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ActivityChildObjectParent.json b/local/elasticsearch-reindex/src/test/resources/ActivityChildObjectParent.json
deleted file mode 100644
index 1fa8f13..0000000
--- a/local/elasticsearch-reindex/src/test/resources/ActivityChildObjectParent.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "template": "*",
- "order": 100,
- "mappings": {
- "object": {
- "dynamic": true
- },
- "activity": {
- "_parent": {
- "type": "object"
- },
- "dynamic": true
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
new file mode 100644
index 0000000..936d6df
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
@@ -0,0 +1,24 @@
+{
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "source": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "indexes": [
+ "elasticsearch_persist_writer_parent_child_it"
+ ],
+ "types": [
+ "activity"
+ ]
+ },
+ "destination": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "index": "elasticsearch_reindex_parent_child_it",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
new file mode 100644
index 0000000..0b69948
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
@@ -0,0 +1,24 @@
+{
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "source": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "indexes": [
+ "elasticsearch_persist_writer_it"
+ ],
+ "types": [
+ "activity"
+ ]
+ },
+ "destination": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "index": "elasticsearch_reindex_it",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
new file mode 100644
index 0000000..936d6df
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
@@ -0,0 +1,24 @@
+{
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "source": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "indexes": [
+ "elasticsearch_persist_writer_parent_child_it"
+ ],
+ "types": [
+ "activity"
+ ]
+ },
+ "destination": {
+ hosts += ${es.tcp.host}
+ port = ${es.tcp.port}
+ "clusterName": "elasticsearch",
+ "index": "elasticsearch_reindex_parent_child_it",
+ "type": "activity",
+ "forceUseConfig": true
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/testReindex.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/testReindex.json b/local/elasticsearch-reindex/src/test/resources/testReindex.json
deleted file mode 100644
index dbc8c18..0000000
--- a/local/elasticsearch-reindex/src/test/resources/testReindex.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "source": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "indexes": [
- "source"
- ],
- "types": [
- "activity"
- ]
- },
- "destination": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "index": "destination",
- "type": "activity",
- "forceUseConfig": true
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/testReindexChild.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/testReindexChild.json b/local/elasticsearch-reindex/src/test/resources/testReindexChild.json
deleted file mode 100644
index dbc8c18..0000000
--- a/local/elasticsearch-reindex/src/test/resources/testReindexChild.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "source": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "indexes": [
- "source"
- ],
- "types": [
- "activity"
- ]
- },
- "destination": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "index": "destination",
- "type": "activity",
- "forceUseConfig": true
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/1444d257/local/elasticsearch-reindex/src/test/resources/testReindexParent.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/testReindexParent.json b/local/elasticsearch-reindex/src/test/resources/testReindexParent.json
deleted file mode 100644
index 3d7568a..0000000
--- a/local/elasticsearch-reindex/src/test/resources/testReindexParent.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "source": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "indexes": [
- "source"
- ],
- "types": [
- "object"
- ]
- },
- "destination": {
- "hosts": [
- "localhost"
- ],
- "port": 9300,
- "clusterName": "elasticsearch",
- "index": "destination",
- "type": "object",
- "forceUseConfig": true
- }
-}