You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/11 21:17:08 UTC

[3/7] incubator-streams-examples git commit: upgrade twitter-userstream-elasticsearch to es 2.0 + docker

upgrade twitter-userstream-elasticsearch to es 2.0 + docker

needs changes to incubator-streams from STREAMS-427


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4a70f9a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4a70f9a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4a70f9a0

Branch: refs/heads/master
Commit: 4a70f9a05b751e210953bc860824c06abd72a7af
Parents: 05a1838
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sun Oct 9 16:25:04 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Oct 9 16:41:39 2016 -0500

----------------------------------------------------------------------
 local/twitter-userstream-elasticsearch/pom.xml  | 104 +++++++++++++++--
 .../twitter/TwitterUserstreamElasticsearch.json |  13 ---
 ...terUserstreamElasticsearchConfiguration.json |  13 +++
 .../src/main/resources/application.conf         |  37 -------
 .../src/site/markdown/index.md                  |  22 +++-
 .../test/TwitterUserstreamElasticsearchIT.java  | 111 +++++++++++++++++++
 .../TwitterUserstreamElasticsearchIT.conf       |  14 +++
 7 files changed, 251 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 1b7b64f..8e14913 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -34,8 +34,8 @@
 
     <properties>
         <docker.repo>apachestreams</docker.repo>
-        <elasticsearch.version>1.1.0</elasticsearch.version>
-        <lucene.version>4.7.2</lucene.version>
+        <elasticsearch.version>2.3.5</elasticsearch.version>
+        <lucene.version>5.5.0</lucene.version>
     </properties>
 
     <dependencies>
@@ -208,22 +208,102 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.12.4</version>
-                <executions>
-                    <execution>
-                        <id>integration-tests</id>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
+                <configuration>
+                    <!-- Run integration test suite rather than individual tests. -->
+                    <excludes>
+                        <exclude>**/*Test.java</exclude>
+                        <exclude>**/*Tests.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*IT.java</include>
+                        <include>**/*ITs.java</include>
+                    </includes>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-junit47</artifactId>
+                        <version>${failsafe.plugin.version}</version>
+                    </dependency>
+                </dependencies>
             </plugin>
             <plugin>
                 <groupId>io.fabric8</groupId>
                 <artifactId>docker-maven-plugin</artifactId>
+                <version>${docker.plugin.version}</version>
             </plugin>
         </plugins>
     </build>
 
+
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>${docker.plugin.version}</version>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>elasticsearch:2.3.5</name>
+                                    <alias>elasticsearch</alias>
+                                    <run>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            <port>${es.http.host}:${es.http.port}:9200</port>
+                                            <port>${es.tcp.host}:${es.tcp.port}:9300</port>
+                                        </ports>
+                                        <portPropertyFile>elasticsearch.properties</portPropertyFile>
+                                        <wait>
+                                            <log>elasticsearch startup</log>
+                                            <http>
+                                                <url>http://${es.http.host}:${es.http.port}</url>
+                                                <method>GET</method>
+                                                <status>200</status>
+                                            </http>
+                                            <time>20000</time>
+                                            <kill>1000</kill>
+                                            <shutdown>500</shutdown>
+                                            <!--<tcp>-->
+                                            <!--<host>${es.transport.host}</host>-->
+                                            <!--<ports>-->
+                                            <!--<port>${es.transport.port}</port>-->
+                                            <!--</ports>-->
+                                            <!--</tcp>-->
+                                        </wait>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
deleted file mode 100644
index 6a25850..0000000
--- a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "$schema": "http://json-schema.org/draft-03/schema",
-  "$license": [
-    "http://www.apache.org/licenses/LICENSE-2.0"
-  ],
-  "type": "object",
-  "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
-  "javaInterfaces": ["java.io.Serializable"],
-  "properties": {
-    "twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", "required": true },
-    "elasticsearch": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
new file mode 100644
index 0000000..6a25850
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", "required": true },
+    "elasticsearch": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration", "type": "object", "required": true }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
deleted file mode 100644
index ef5f023..0000000
--- a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-twitter {
-    endpoint = "userstream"
-    oauth {
-        consumerKey = ""
-        consumerSecret = ""
-        accessToken = ""
-        accessTokenSecret = ""
-    }
-    follow = [
-\u2002 \u2002 \u2002 \u2002 \u2002 \u2002
-\u2002 \u2002 ]
-}
-elasticsearch {
-    hosts = [
-        localhost
-    ]
-    port = 9300
-    clusterName = elasticsearch
-    index = userstream_activity
-    type = activity
-    batchSize = 1
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/site/markdown/index.md b/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
index baeee26..a10846c 100644
--- a/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
+++ b/local/twitter-userstream-elasticsearch/src/site/markdown/index.md
@@ -33,8 +33,28 @@ The accessToken and accessTokenSecret can be obtained by navigating to:
 Build:
 ---------
 
-    mvn clean package verify
+    mvn clean package
 
+Testing:
+---------
+
+Create a local file `application.conf` with valid twitter credentials
+
+    twitter {
+      oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+      }
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn -PdockerITs docker:start
+    mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+    mvn -PdockerITs docker:stop
+        
 Run (Local):
 ------------
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
new file mode 100644
index 0000000..2f524f0
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.twitter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.example.TwitterUserstreamElasticsearch;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+public class TwitterUserstreamElasticsearchIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+
+    protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertFalse(indicesExistsResponse.isExists());
+
+    }
+
+    @Test
+    public void testReindex() throws Exception {
+
+        TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+
+        stream.run();
+
+        // assert lines in file
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+                .setTypes(testConfiguration.getElasticsearch().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4a70f9a0/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
new file mode 100644
index 0000000..ae3b463
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
@@ -0,0 +1,14 @@
+twitter {
+  endpoint = sample
+  track = [
+    "data"
+  ]
+}
+elasticsearch {
+  hosts += ${es.tcp.host}
+  port = ${es.tcp.port}
+  clusterName = elasticsearch
+  index = twitter_userstream_elasticsearch_it
+  type = activity
+  forceUseConfig = true
+}
\ No newline at end of file