You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/14 16:24:52 UTC
[4/9] incubator-streams-examples git commit: normalize package names
in streams-examples/local
normalize package names in streams-examples/local
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/5b96588c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/5b96588c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/5b96588c
Branch: refs/heads/master
Commit: 5b96588c492cfafa1732e5d5a680df8485bdb491
Parents: e949f58
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 11 16:30:11 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 11 16:39:07 2016 -0500
----------------------------------------------------------------------
.../example/ElasticsearchHdfs.java | 85 -----
.../example/HdfsElasticsearch.java | 86 -----
.../streams/example/ElasticsearchHdfs.java | 80 +++++
.../streams/example/HdfsElasticsearch.java | 80 +++++
.../ElasticsearchHdfsConfiguration.json | 2 +-
.../HdfsElasticsearchConfiguration.json | 2 +-
.../elasticsearch/test/ElasticsearchHdfsIT.java | 128 --------
.../example/elasticsearch/test/ExampleITs.java | 17 -
.../elasticsearch/test/HdfsElasticsearchIT.java | 132 --------
.../example/test/ElasticsearchHdfsIT.java | 112 +++++++
.../apache/streams/example/test/ExampleITs.java | 17 +
.../example/test/HdfsElasticsearchIT.java | 118 +++++++
.../example/ElasticsearchReindex.java | 84 -----
.../streams/example/ElasticsearchReindex.java | 80 +++++
.../ElasticsearchReindexConfiguration.json | 2 +-
.../test/ElasticsearchReindexChildIT.java | 121 -------
.../test/ElasticsearchReindexIT.java | 120 -------
.../test/ElasticsearchReindexParentIT.java | 133 --------
.../example/elasticsearch/test/ReindexITs.java | 20 --
.../test/ElasticsearchReindexChildIT.java | 121 +++++++
.../example/test/ElasticsearchReindexIT.java | 120 +++++++
.../test/ElasticsearchReindexParentIT.java | 133 ++++++++
.../apache/streams/example/test/ReindexITs.java | 20 ++
.../example/MongoElasticsearchSync.java | 79 -----
.../streams/example/MongoElasticsearchSync.java | 79 +++++
.../MongoElasticsearchSyncConfiguration.json | 2 +-
.../mongodb/test/MongoElasticsearchSyncIT.java | 121 -------
.../streams/example/mongodb/test/SyncITs.java | 16 -
.../example/test/MongoElasticsearchSyncIT.java | 117 +++++++
.../apache/streams/example/test/SyncITs.java | 16 +
.../src/test/resources/testSync.json | 21 --
local/pom.xml | 5 +-
local/twitter-follow-graph/README.md | 8 -
local/twitter-follow-graph/pom.xml | 316 -------------------
.../example/graph/TwitterFollowGraph.java | 103 ------
.../TwitterFollowGraphConfiguration.json | 13 -
.../src/main/resources/TwitterFollowGraph.dot | 39 ---
.../src/site/markdown/index.md | 75 -----
.../twitter/example/TwitterFollowGraphIT.java | 79 -----
.../test/resources/TwitterFollowGraphIT.conf | 28 --
local/twitter-follow-neo4j/README.md | 8 +
...itter-follow-graph-jar-with-dependencies.jar | Bin 0 -> 25829072 bytes
local/twitter-follow-neo4j/pom.xml | 316 +++++++++++++++++++
.../streams/example/TwitterFollowNeo4j.java | 93 ++++++
.../TwitterFollowNeo4jConfiguration.json | 13 +
.../src/main/resources/TwitterFollowNeo4j.dot | 39 +++
.../src/site/markdown/TwitterFollowNeo4j.md | 33 ++
.../src/site/markdown/index.md | 42 +++
.../src/site/resources/TwitterFollowGraph.dot | 39 +++
.../TwitterFollowGraphConfiguration.json | 13 +
.../src/site/resources/TwitterFollowNeo4j.dot | 39 +++
.../TwitterFollowNeo4jConfiguration.json | 13 +
local/twitter-follow-neo4j/src/site/site.xml | 45 +++
.../example/test/TwitterFollowNeo4jIT.java | 79 +++++
.../test/resources/TwitterFollowGraphIT.conf | 28 ++
.../example/TwitterHistoryElasticsearch.java | 81 +++++
.../twitter/TwitterHistoryElasticsearch.java | 90 ------
...witterHistoryElasticsearchConfiguration.json | 2 +-
.../test/TwitterHistoryElasticsearchIT.java | 108 +++++++
.../example/TwitterHistoryElasticsearchIT.java | 108 -------
.../example/TwitterUserstreamElasticsearch.java | 146 +++++++++
.../example/TwitterUserstreamElasticsearch.java | 146 ---------
...terUserstreamElasticsearchConfiguration.json | 2 +-
.../test/TwitterUserstreamElasticsearchIT.java | 109 +++++++
.../test/TwitterUserstreamElasticsearchIT.java | 111 -------
65 files changed, 2344 insertions(+), 2289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
deleted file mode 100644
index da0acbd..0000000
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
-import org.apache.streams.hdfs.WebHdfsPersistWriter;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class ElasticsearchHdfs implements Runnable {
-
- public final static String STREAMS_ID = "ElasticsearchHdfs";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
-
- ElasticsearchHdfsConfiguration config;
-
- public ElasticsearchHdfs() {
- this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
- this.config = reindex;
- }
-
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
-
- ElasticsearchHdfs backup = new ElasticsearchHdfs();
-
- new Thread(backup).start();
-
- }
-
- @Override
- public void run() {
-
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
- WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
- builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
- builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
- builder.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
deleted file mode 100644
index 0a65479..0000000
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.hdfs.WebHdfsPersistReader;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class HdfsElasticsearch implements Runnable {
-
- public final static String STREAMS_ID = "HdfsElasticsearch";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
-
- HdfsElasticsearchConfiguration config;
-
- public HdfsElasticsearch() {
- this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
- this.config = reindex;
- }
-
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
-
- HdfsElasticsearch restore = new HdfsElasticsearch();
-
- new Thread(restore).start();
-
- }
-
- @Override
- public void run() {
-
- WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
-
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
- builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
- builder.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
new file mode 100644
index 0000000..8d3cf36
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchHdfs implements Runnable {
+
+ public final static String STREAMS_ID = "ElasticsearchHdfs";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+
+ ElasticsearchHdfsConfiguration config;
+
+ public ElasticsearchHdfs() {
+ this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+ this.config = reindex;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ ElasticsearchHdfs backup = new ElasticsearchHdfs();
+
+ new Thread(backup).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+ WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
new file mode 100644
index 0000000..847ac48
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class HdfsElasticsearch implements Runnable {
+
+ public final static String STREAMS_ID = "HdfsElasticsearch";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+
+ HdfsElasticsearchConfiguration config;
+
+ public HdfsElasticsearch() {
+ this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+ this.config = reindex;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ HdfsElasticsearch restore = new HdfsElasticsearch();
+
+ new Thread(restore).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
index 9ad7e54..ee17a3d 100644
--- a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
@@ -4,7 +4,7 @@
"http://www.apache.org/licenses/LICENSE-2.0"
],
"type": "object",
- "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration",
+ "javaType" : "org.apache.streams.example.ElasticsearchHdfsConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
index 8b77225..4239279 100644
--- a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
"http://www.apache.org/licenses/LICENSE-2.0"
],
"type": "object",
- "javaType" : "org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration",
+ "javaType" : "org.apache.streams.example.HdfsElasticsearchConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"source": { "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration", "type": "object", "required": true },
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
deleted file mode 100644
index 8dfe244..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between hdfs and elasticsearch
- */
-public class ElasticsearchHdfsIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- protected ElasticsearchHdfsConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
-
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- assertNotEquals(count, 0);
- }
-
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
-
- ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
-
- backup.run();
-
- // assert lines in file
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
deleted file mode 100644
index ab882c8..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.streams.example.elasticsearch.test;
-
-import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- ElasticsearchPersistWriterIT.class,
- ElasticsearchHdfsIT.class,
- HdfsElasticsearchIT.class,
-})
-
-public class ExampleITs {
- // the class remains empty,
- // used only as a holder for the above annotations
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
deleted file mode 100644
index 1a055f6..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between hdfs and elasticsearch
- */
-public class HdfsElasticsearchIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- protected HdfsElasticsearchConfiguration testConfiguration;
- protected Client testClient;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- if(indicesExistsResponse.isExists()) {
- DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
- DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
- assertTrue(deleteIndexResponse.isAcknowledged());
- };
- }
-
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
-
- HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
-
- restore.run();
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
-
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- assertEquals(89, countResponse.getHits().getTotalHits());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
new file mode 100644
index 0000000..8e87f3a
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.ElasticsearchHdfs;
+import org.apache.streams.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+public class ElasticsearchHdfsIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ protected ElasticsearchHdfsConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
+
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+ }
+
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
+
+ ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+
+ backup.run();
+
+ // assert lines in file
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
new file mode 100644
index 0000000..5965914
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
@@ -0,0 +1,17 @@
+package org.apache.streams.example.test;
+
+import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ ElasticsearchPersistWriterIT.class,
+ ElasticsearchHdfsIT.class,
+ HdfsElasticsearchIT.class,
+})
+
+public class ExampleITs {
+ // the class remains empty,
+ // used only as a holder for the above annotations
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
new file mode 100644
index 0000000..4eb7fc0
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.HdfsElasticsearch;
+import org.apache.streams.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+public class HdfsElasticsearchIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ protected HdfsElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ if(indicesExistsResponse.isExists()) {
+ DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+ DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+ assertTrue(deleteIndexResponse.isAcknowledged());
+ };
+ }
+
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
+
+ HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+
+ restore.run();
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
+
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ assertEquals(89, countResponse.getHits().getTotalHits());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
deleted file mode 100644
index dc94773..0000000
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.*;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class ElasticsearchReindex implements Runnable {
-
- public final static String STREAMS_ID = "ElasticsearchReindex";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
-
- ElasticsearchReindexConfiguration config;
-
- public ElasticsearchReindex() {
- this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
- this.config = reindex;
- }
-
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
-
- ElasticsearchReindex reindex = new ElasticsearchReindex();
-
- new Thread(reindex).start();
-
- }
-
- @Override
- public void run() {
-
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
- builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
- builder.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
new file mode 100644
index 0000000..dfb2a98
--- /dev/null
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchReindex implements Runnable {
+
+ public final static String STREAMS_ID = "ElasticsearchReindex";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+
+ ElasticsearchReindexConfiguration config;
+
+ public ElasticsearchReindex() {
+ this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+ }
+
+ public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+ this.config = reindex;
+ }
+
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+
+ ElasticsearchReindex reindex = new ElasticsearchReindex();
+
+ new Thread(reindex).start();
+
+ }
+
+ @Override
+ public void run() {
+
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+ StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+ builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+ builder.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
index 1237538..09bdf5b 100644
--- a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
+++ b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
@@ -4,7 +4,7 @@
"http://www.apache.org/licenses/LICENSE-2.0"
],
"type": "object",
- "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration",
+ "javaType" : "org.apache.streams.example.ElasticsearchReindexConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
deleted file mode 100644
index d033014..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying parent/child associated documents between two indexes on same cluster
- */
-public class ElasticsearchReindexChildIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
-
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- assertNotEquals(count, 0);
-
- }
-
- @Test
- public void testReindex() throws Exception {
-
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
- reindex.run();
-
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
deleted file mode 100644
index 5854ac0..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between two indexes on same cluster
- */
-public class ElasticsearchReindexIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
-
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- assertNotEquals(count, 0);
-
- }
-
- @Test
- public void testReindex() throws Exception {
-
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
- reindex.run();
-
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
deleted file mode 100644
index 90924e7..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying parent/child associated documents between two indexes on same cluster
- */
-public class ElasticsearchReindexParentIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
-
- private int count = 0;
-
- @Before
- public void prepareTest() throws Exception {
-
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Properties es_properties = new Properties();
- InputStream es_stream = new FileInputStream("elasticsearch.properties");
- es_properties.load(es_stream);
- Config esProps = ConfigFactory.parseProperties(es_properties);
- Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
- StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
-
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- count = (int)countResponse.getHits().getTotalHits();
-
- PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
- URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
- ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
- String templateSource = MAPPER.writeValueAsString(template);
- putTemplateRequestBuilder.setSource(templateSource);
-
- testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
-
- assertNotEquals(count, 0);
-
- }
-
- @Test
- public void testReindex() throws Exception {
-
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
- reindex.run();
-
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
-
- assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
deleted file mode 100644
index 9c46d31..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.streams.example.elasticsearch.test;
-
-import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
-import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- ElasticsearchPersistWriterIT.class,
- ElasticsearchParentChildWriterIT.class,
- ElasticsearchReindexIT.class,
- ElasticsearchReindexParentIT.class,
- ElasticsearchReindexChildIT.class
-})
-
-public class ReindexITs {
- // the class remains empty,
- // used only as a holder for the above annotations
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
new file mode 100644
index 0000000..47c8f51
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.ElasticsearchReindex;
+import org.apache.streams.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying parent/child associated documents between two indexes on same cluster
+ */
+public class ElasticsearchReindexChildIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
+
+ private int count = 0;
+
+ @Before
+ public void prepareTest() throws Exception {
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Properties es_properties = new Properties();
+ InputStream es_stream = new FileInputStream("elasticsearch.properties");
+ es_properties.load(es_stream);
+ Config esProps = ConfigFactory.parseProperties(es_properties);
+ Config typesafe = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
+
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ count = (int)countResponse.getHits().getTotalHits();
+
+ assertNotEquals(count, 0);
+
+ }
+
+ @Test
+ public void testReindex() throws Exception {
+
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+
+ reindex.run();
+
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
+
+ assertEquals(count, (int)countResponse.getHits().getTotalHits());
+
+ }
+
+}