You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/14 16:24:50 UTC
[2/9] incubator-streams-examples git commit: normalize package names
in streams-examples/local
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
new file mode 100644
index 0000000..34ac8c4
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.converter.ActivityConverterProcessorConfiguration;
+import org.apache.streams.converter.TypeConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import org.apache.streams.graph.GraphHttpConfiguration;
+import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
+import org.apache.streams.twitter.provider.TwitterFollowingProvider;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collects friend and follow connections for a set of twitter users and builds a graph
+ * database in neo4j.
+ */
+public class TwitterFollowNeo4j implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+
+ TwitterFollowNeo4jConfiguration config;
+
+ public TwitterFollowNeo4j() {
+ this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
+
+ public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
+ this.config = config;
+ }
+
+ public void run() {
+
+ TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
+ TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
+ TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
+
+ ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
+ new ActivityConverterProcessorConfiguration()
+ .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
+ .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
+ ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
+
+ GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
+ GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
+
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider);
+ builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID);
+ builder.addStreamsProcessor("activity", activity, 1, "converter");
+ builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity");
+
+ builder.start();
+ }
+
+ public static void main(String[] args) {
+
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
+
+ stream.run();
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
new file mode 100644
index 0000000..ffbd39d
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/main/jsonschema/TwitterFollowNeo4jConfiguration.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true },
+ "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/main/resources/TwitterFollowNeo4j.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //processors
+ TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"];
+ ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"];
+
+ //persisters
+ GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"];
+
+ //data
+ destination [label="http://{host}:{port}/db/data",shape=box];
+
+ //stream
+ TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"];
+ TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+ ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ GraphPersistWriter -> destination
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
new file mode 100644
index 0000000..936efb4
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/markdown/TwitterFollowNeo4j.md
@@ -0,0 +1,33 @@
+### TwitterFollowNeo4j
+
+#### Description:
+
+Collects friend or follower connections for a set of twitter users to build a graph database in neo4j.
+
+#### Configuration:
+
+[TwitterFollowNeo4jIT.conf](TwitterFollowNeo4jIT.conf "TwitterFollowNeo4jIT.conf" )
+
+#### Run (SBT):
+
+ sbtx -210 -sbt-create
+ set resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
+ set libraryDependencies += "org.apache.streams" % "twitter-follow-neo4j" % "0.4-incubating-SNAPSHOT"
+ set fork := true
+ set javaOptions +="-Dconfig.file=application.conf"
+ run org.apache.streams.example.graph.TwitterFollowNeo4j
+
+#### Run (Docker):
+
+ docker run apachestreams/twitter-follow-neo4j java -cp twitter-follow-neo4j-jar-with-dependencies.jar org.apache.streams.example.TwitterFollowNeo4j
+
+#### Specification:
+
+[TwitterFollowNeo4j.dot](TwitterFollowNeo4j.dot "TwitterFollowNeo4j.dot" )
+
+#### Diagram:
+
+![TwitterFollowNeo4j.dot.svg](./TwitterFollowNeo4j.dot.svg)
+
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/markdown/index.md b/local/twitter-follow-neo4j/src/site/markdown/index.md
new file mode 100644
index 0000000..3efdc5b
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/markdown/index.md
@@ -0,0 +1,42 @@
+### twitter-follow-neo4j
+
+#### Requirements:
+ - Authorized Twitter API credentials
+ - A running Neo4J 3.0.0+ instance
+
+#### Streams:
+
+<a href="TwitterFollowNeo4j.html" target="_self">TwitterFollowNeo4j</a>
+
+#### Build:
+
+ mvn clean package verify
+
+#### Test:
+
+Create a local file `application.conf` with valid twitter credentials
+
+ twitter {
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+
+Start up neo4j with docker:
+
+ mvn -PdockerITs docker:start
+
+Build with integration testing enabled, using your credentials
+
+ mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+
+Shutdown neo4j when finished:
+
+ mvn -PdockerITs docker:stop
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraph.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //processors
+ TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"];
+ ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"];
+
+ //persisters
+ GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"];
+
+ //data
+ destination [label="http://{host}:{port}/db/data",shape=box];
+
+ //stream
+ TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"];
+ TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+ ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ GraphPersistWriter -> destination
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
new file mode 100644
index 0000000..6025640
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowGraphConfiguration.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.example.graph.TwitterFollowNeo4jConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true },
+ "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
new file mode 100644
index 0000000..2d9e495
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4j.dot
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ digraph g {
+
+ //providers
+ TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"];
+
+ //processors
+ TypeConverterProcessor [label="TypeConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/TypeConverterProcessor.java"];
+ ActivityConverterProcessor [label="ActivityConverterProcessor",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converters/ActivityConverterProcessor.java"];
+
+ //persisters
+ GraphPersistWriter [label="GraphPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java"];
+
+ //data
+ destination [label="http://{host}:{port}/db/data",shape=box];
+
+ //stream
+ TwitterFollowingProvider -> TypeConverterProcessor [label="Follow",URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Follow.java"];
+ TypeConverterProcessor -> ActivityConverterProcessor [label="String"];
+ ActivityConverterProcessor -> GraphPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ GraphPersistWriter -> destination
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
new file mode 100644
index 0000000..ffbd39d
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/resources/TwitterFollowNeo4jConfiguration.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.example.TwitterFollowNeo4jConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "twitter": { "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration", "type": "object", "required": true },
+ "graph": { "javaType": "org.apache.streams.graph.GraphHttpConfiguration", "type": "object", "required": true }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/site/site.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/site/site.xml b/local/twitter-follow-neo4j/src/site/site.xml
new file mode 100644
index 0000000..a25bae0
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/site/site.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project>
+ <custom>
+ <fluidoSkin>
+ <topBarEnabled>false</topBarEnabled>
+ <navBarStyle>navbar-inverse</navBarStyle>
+ <sideBarEnabled>true</sideBarEnabled>
+ <!--<gitHub>-->
+ <!--<projectId>apache/incubator-streams-examples</projectId>-->
+ <!--<ribbonOrientation>right</ribbonOrientation>-->
+ <!--<ribbonColor>black</ribbonColor>-->
+ <!--</gitHub>-->
+ <!--<twitter>-->
+ <!--<user>ApacheStreams</user>-->
+ <!--<showUser>true</showUser>-->
+ <!--<showFollowers>true</showFollowers>-->
+ <!--</twitter>-->
+ </fluidoSkin>
+ </custom>
+ <body>
+ <menu name="Configuration">
+ <item name="Neo4j" href="../../services/neo4j.html"/>
+ </menu>
+ <menu name="Credentials">
+ <item name="Twitter" href="../../credentials/twitter.html"/>
+ </menu>
+ </body>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
new file mode 100644
index 0000000..2813b08
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.example.TwitterFollowNeo4j;
+import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ */
+public class TwitterFollowNeo4jIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
+
+ protected TwitterFollowNeo4jConfiguration testConfiguration;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterFollowGraphIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties graph_properties = new Properties();
+ InputStream graph_stream = new FileInputStream("neo4j.properties");
+ graph_properties.load(graph_stream);
+ Config graphProps = ConfigFactory.parseProperties(graph_properties);
+ Config typesafe = testResourceConfig.withFallback(graphProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
+
+ }
+
+ @Test
+ public void testTwitterFollowGraph() throws Exception {
+
+ TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
+
+ stream.run();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
new file mode 100644
index 0000000..d4b4aeb
--- /dev/null
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowGraphIT.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+twitter {
+ endpoint = "friends"
+ info = [
+ 18055613
+ ]
+ twitter.max_items = 1000
+}
+graph {
+ hostname = ${neo4j.http.host}
+ port = ${neo4j.http.port}
+ type = "neo4j"
+ graph = "data"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
new file mode 100644
index 0000000..7d87f36
--- /dev/null
+++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieves as many posts from a known list of users as twitter API allows.
+ *
+ * Converts them to activities, and writes them in activity format to Elasticsearch.
+ */
+
+public class TwitterHistoryElasticsearch implements Runnable {
+
+ public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ TwitterHistoryElasticsearchConfiguration config;
+
+ public TwitterHistoryElasticsearch() {
+ this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
+
+ new Thread(history).start();
+
+ }
+
+
+ public void run() {
+
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
+ ActivityConverterProcessor converter = new ActivityConverterProcessor();
+ ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
+
+ StreamBuilder builder = new LocalStreamBuilder(500);
+
+ builder.newPerpetualStream("provider", provider);
+ builder.addStreamsProcessor("converter", converter, 2, "provider");
+ builder.addStreamsPersistWriter("writer", writer, 1, "converter");
+ builder.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
deleted file mode 100644
index 090b9ed..0000000
--- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/twitter/TwitterHistoryElasticsearch.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.twitter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
-import org.apache.streams.twitter.provider.TwitterConfigurator;
-import org.apache.streams.twitter.provider.TwitterTimelineProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Retrieves as many posts from a known list of users as twitter API allows.
- *
- * Converts them to activities, and writes them in activity format to Elasticsearch.
- */
-
-public class TwitterHistoryElasticsearch implements Runnable {
-
- public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
-
- private static final ObjectMapper mapper = new ObjectMapper();
-
- TwitterHistoryElasticsearchConfiguration config;
-
- public TwitterHistoryElasticsearch() {
- this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
- this.config = config;
- }
-
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
-
- TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
-
- new Thread(history).start();
-
- }
-
-
- public void run() {
-
- TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
- ActivityConverterProcessor converter = new ActivityConverterProcessor();
- ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
-
- StreamBuilder builder = new LocalStreamBuilder(500);
-
- builder.newPerpetualStream("provider", provider);
- builder.addStreamsProcessor("converter", converter, 2, "provider");
- builder.addStreamsPersistWriter("writer", writer, 1, "converter");
- builder.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
index ea9b165..eaf8028 100644
--- a/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
+++ b/local/twitter-history-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterHistoryElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
"http://www.apache.org/licenses/LICENSE-2.0"
],
"type": "object",
- "javaType" : "org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration",
+ "javaType" : "org.apache.streams.example.TwitterHistoryElasticsearchConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"twitter": { "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration", "type": "object", "required": true },
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
new file mode 100644
index 0000000..b0c9155
--- /dev/null
+++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.TwitterHistoryElasticsearch;
+import org.apache.streams.example.TwitterHistoryElasticsearchConfiguration;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ */
+public class TwitterHistoryElasticsearchIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
+
+ protected TwitterHistoryElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertFalse(indicesExistsResponse.isExists());
+
+ }
+
+ @Test
+ public void testTwitterHistoryElasticsearch() throws Exception {
+
+ TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
+
+ stream.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+ .setTypes(testConfiguration.getElasticsearch().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
deleted file mode 100644
index b8e1b64..0000000
--- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/twitter/example/TwitterHistoryElasticsearchIT.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.example;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.example.twitter.TwitterHistoryElasticsearch;
-import org.apache.streams.example.twitter.TwitterHistoryElasticsearchConfiguration;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
- */
-public class TwitterHistoryElasticsearchIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
-
- protected TwitterHistoryElasticsearchConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertFalse(indicesExistsResponse.isExists());
-
- }
-
- @Test
- public void testTwitterHistoryElasticsearch() throws Exception {
-
- TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
-
- stream.run();
-
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getElasticsearch().getIndex())
- .setTypes(testConfiguration.getElasticsearch().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- assertNotEquals(count, 0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
new file mode 100644
index 0000000..f1e776a
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.filters.VerbDefinitionDropFilter;
+import org.apache.streams.filters.VerbDefinitionKeepFilter;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.apache.streams.verbs.ObjectCombination;
+import org.apache.streams.verbs.VerbDefinition;
+import org.elasticsearch.common.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ */
+public class TwitterUserstreamElasticsearch implements Runnable {
+
+ public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+
+ /* this pattern will match any/only deletes */
+ private static VerbDefinition deleteVerbDefinition =
+ new VerbDefinition()
+ .withValue("delete")
+ .withObjects(Lists.newArrayList(new ObjectCombination()));
+
+ TwitterUserstreamElasticsearchConfiguration config;
+
+ public TwitterUserstreamElasticsearch() {
+ this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+ new Thread(userstream).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+
+ TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+ ActivityConverterProcessor converter = new ActivityConverterProcessor();
+ VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
+ ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+ VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+ SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
+ ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
+
+ builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
+ builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
+ builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
+ builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
+ builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
+ builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
+
+ builder.start();
+
+ }
+
+ protected class SetDeleteIdProcessor implements StreamsProcessor {
+
+ public String getId() {
+ return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ String id = entry.getId();
+ // replace delete with post in id
+ // ensure ElasticsearchPersistDeleter will remove original post if present
+ id = Strings.replace(id, "delete", "post");
+ entry.setId(id);
+
+ return Lists.newArrayList(entry);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
deleted file mode 100644
index c483742..0000000
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.example;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.ActivityConverterProcessor;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.filters.VerbDefinitionDropFilter;
-import org.apache.streams.filters.VerbDefinitionKeepFilter;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.provider.TwitterStreamProvider;
-import org.apache.streams.verbs.ObjectCombination;
-import org.apache.streams.verbs.VerbDefinition;
-import org.elasticsearch.common.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
- */
-public class TwitterUserstreamElasticsearch implements Runnable {
-
- public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
-
- /* this pattern will match any/only deletes */
- private static VerbDefinition deleteVerbDefinition =
- new VerbDefinition()
- .withValue("delete")
- .withObjects(Lists.newArrayList(new ObjectCombination()));
-
- TwitterUserstreamElasticsearchConfiguration config;
-
- public TwitterUserstreamElasticsearch() {
- this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
- this.config = config;
- }
-
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
-
- TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
- new Thread(userstream).start();
-
- }
-
- @Override
- public void run() {
-
- TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
- ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
-
- TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
- ActivityConverterProcessor converter = new ActivityConverterProcessor();
- VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
- ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
- VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
- SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
- ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
-
- builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
- builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
- builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
- builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
- builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
- builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
-
- builder.start();
-
- }
-
- protected class SetDeleteIdProcessor implements StreamsProcessor {
-
- public String getId() {
- return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- String id = entry.getId();
- // replace delete with post in id
- // ensure ElasticsearchPersistDeleter will remove original post if present
- id = Strings.replace(id, "delete", "post");
- entry.setId(id);
-
- return Lists.newArrayList(entry);
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
-
- }
-
- @Override
- public void cleanUp() {
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
index 6a25850..7261439 100644
--- a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
+++ b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
"http://www.apache.org/licenses/LICENSE-2.0"
],
"type": "object",
- "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
+ "javaType" : "org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type": "object", "required": true },
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
new file mode 100644
index 0000000..7ba9940
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.example.TwitterUserstreamElasticsearch;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between two indexes on same cluster
+ */
+public class TwitterUserstreamElasticsearchIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+
+ protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertFalse(indicesExistsResponse.isExists());
+
+ }
+
+ @Test
+ public void testReindex() throws Exception {
+
+ TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+
+ stream.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+ .setTypes(testConfiguration.getElasticsearch().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
deleted file mode 100644
index 2f524f0..0000000
--- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/twitter/test/TwitterUserstreamElasticsearchIT.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.example.TwitterUserstreamElasticsearch;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between two indexes on same cluster
- */
-public class TwitterUserstreamElasticsearchIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
-
- protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getElasticsearch()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertFalse(indicesExistsResponse.isExists());
-
- }
-
- @Test
- public void testReindex() throws Exception {
-
- TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
-
- stream.run();
-
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getElasticsearch().getIndex())
- .setTypes(testConfiguration.getElasticsearch().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- assertNotEquals(count, 0);
- }
-}