You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/14 16:24:52 UTC

[4/9] incubator-streams-examples git commit: normalize package names in streams-examples/local

normalize package names in streams-examples/local


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/5b96588c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/5b96588c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/5b96588c

Branch: refs/heads/master
Commit: 5b96588c492cfafa1732e5d5a680df8485bdb491
Parents: e949f58
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 11 16:30:11 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 11 16:39:07 2016 -0500

----------------------------------------------------------------------
 .../example/ElasticsearchHdfs.java              |  85 -----
 .../example/HdfsElasticsearch.java              |  86 -----
 .../streams/example/ElasticsearchHdfs.java      |  80 +++++
 .../streams/example/HdfsElasticsearch.java      |  80 +++++
 .../ElasticsearchHdfsConfiguration.json         |   2 +-
 .../HdfsElasticsearchConfiguration.json         |   2 +-
 .../elasticsearch/test/ElasticsearchHdfsIT.java | 128 --------
 .../example/elasticsearch/test/ExampleITs.java  |  17 -
 .../elasticsearch/test/HdfsElasticsearchIT.java | 132 --------
 .../example/test/ElasticsearchHdfsIT.java       | 112 +++++++
 .../apache/streams/example/test/ExampleITs.java |  17 +
 .../example/test/HdfsElasticsearchIT.java       | 118 +++++++
 .../example/ElasticsearchReindex.java           |  84 -----
 .../streams/example/ElasticsearchReindex.java   |  80 +++++
 .../ElasticsearchReindexConfiguration.json      |   2 +-
 .../test/ElasticsearchReindexChildIT.java       | 121 -------
 .../test/ElasticsearchReindexIT.java            | 120 -------
 .../test/ElasticsearchReindexParentIT.java      | 133 --------
 .../example/elasticsearch/test/ReindexITs.java  |  20 --
 .../test/ElasticsearchReindexChildIT.java       | 121 +++++++
 .../example/test/ElasticsearchReindexIT.java    | 120 +++++++
 .../test/ElasticsearchReindexParentIT.java      | 133 ++++++++
 .../apache/streams/example/test/ReindexITs.java |  20 ++
 .../example/MongoElasticsearchSync.java         |  79 -----
 .../streams/example/MongoElasticsearchSync.java |  79 +++++
 .../MongoElasticsearchSyncConfiguration.json    |   2 +-
 .../mongodb/test/MongoElasticsearchSyncIT.java  | 121 -------
 .../streams/example/mongodb/test/SyncITs.java   |  16 -
 .../example/test/MongoElasticsearchSyncIT.java  | 117 +++++++
 .../apache/streams/example/test/SyncITs.java    |  16 +
 .../src/test/resources/testSync.json            |  21 --
 local/pom.xml                                   |   5 +-
 local/twitter-follow-graph/README.md            |   8 -
 local/twitter-follow-graph/pom.xml              | 316 -------------------
 .../example/graph/TwitterFollowGraph.java       | 103 ------
 .../TwitterFollowGraphConfiguration.json        |  13 -
 .../src/main/resources/TwitterFollowGraph.dot   |  39 ---
 .../src/site/markdown/index.md                  |  75 -----
 .../twitter/example/TwitterFollowGraphIT.java   |  79 -----
 .../test/resources/TwitterFollowGraphIT.conf    |  28 --
 local/twitter-follow-neo4j/README.md            |   8 +
 ...itter-follow-graph-jar-with-dependencies.jar | Bin 0 -> 25829072 bytes
 local/twitter-follow-neo4j/pom.xml              | 316 +++++++++++++++++++
 .../streams/example/TwitterFollowNeo4j.java     |  93 ++++++
 .../TwitterFollowNeo4jConfiguration.json        |  13 +
 .../src/main/resources/TwitterFollowNeo4j.dot   |  39 +++
 .../src/site/markdown/TwitterFollowNeo4j.md     |  33 ++
 .../src/site/markdown/index.md                  |  42 +++
 .../src/site/resources/TwitterFollowGraph.dot   |  39 +++
 .../TwitterFollowGraphConfiguration.json        |  13 +
 .../src/site/resources/TwitterFollowNeo4j.dot   |  39 +++
 .../TwitterFollowNeo4jConfiguration.json        |  13 +
 local/twitter-follow-neo4j/src/site/site.xml    |  45 +++
 .../example/test/TwitterFollowNeo4jIT.java      |  79 +++++
 .../test/resources/TwitterFollowGraphIT.conf    |  28 ++
 .../example/TwitterHistoryElasticsearch.java    |  81 +++++
 .../twitter/TwitterHistoryElasticsearch.java    |  90 ------
 ...witterHistoryElasticsearchConfiguration.json |   2 +-
 .../test/TwitterHistoryElasticsearchIT.java     | 108 +++++++
 .../example/TwitterHistoryElasticsearchIT.java  | 108 -------
 .../example/TwitterUserstreamElasticsearch.java | 146 +++++++++
 .../example/TwitterUserstreamElasticsearch.java | 146 ---------
 ...terUserstreamElasticsearchConfiguration.json |   2 +-
 .../test/TwitterUserstreamElasticsearchIT.java  | 109 +++++++
 .../test/TwitterUserstreamElasticsearchIT.java  | 111 -------
 65 files changed, 2344 insertions(+), 2289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
deleted file mode 100644
index da0acbd..0000000
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchHdfs.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
-import org.apache.streams.hdfs.WebHdfsPersistWriter;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class ElasticsearchHdfs implements Runnable {
-
-    public final static String STREAMS_ID = "ElasticsearchHdfs";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
-
-    ElasticsearchHdfsConfiguration config;
-
-    public ElasticsearchHdfs() {
-       this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
-        this.config = reindex;
-    }
-
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
-
-        ElasticsearchHdfs backup = new ElasticsearchHdfs();
-
-        new Thread(backup).start();
-
-    }
-
-    @Override
-    public void run() {
-
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
-        WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
-        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
-        builder.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
deleted file mode 100644
index 0a65479..0000000
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/elasticsearch/example/HdfsElasticsearch.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.hdfs.WebHdfsPersistReader;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class HdfsElasticsearch implements Runnable {
-
-    public final static String STREAMS_ID = "HdfsElasticsearch";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
-
-    HdfsElasticsearchConfiguration config;
-
-    public HdfsElasticsearch() {
-       this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
-        this.config = reindex;
-    }
-
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
-
-        HdfsElasticsearch restore = new HdfsElasticsearch();
-
-        new Thread(restore).start();
-
-    }
-
-    @Override
-    public void run() {
-
-        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
-
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
-        builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
-        builder.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
new file mode 100644
index 0000000..8d3cf36
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchHdfs implements Runnable {
+
+    public final static String STREAMS_ID = "ElasticsearchHdfs";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
+
+    ElasticsearchHdfsConfiguration config;
+
+    public ElasticsearchHdfs() {
+       this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+        this.config = reindex;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        ElasticsearchHdfs backup = new ElasticsearchHdfs();
+
+        new Thread(backup).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+        WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+        builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
new file mode 100644
index 0000000..847ac48
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class HdfsElasticsearch implements Runnable {
+
+    public final static String STREAMS_ID = "HdfsElasticsearch";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
+
+    HdfsElasticsearchConfiguration config;
+
+    public HdfsElasticsearch() {
+       this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+        this.config = reindex;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        HdfsElasticsearch restore = new HdfsElasticsearch();
+
+        new Thread(restore).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+
+        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader);
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
index 9ad7e54..ee17a3d 100644
--- a/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/ElasticsearchHdfsConfiguration.json
@@ -4,7 +4,7 @@
     "http://www.apache.org/licenses/LICENSE-2.0"
   ],
   "type": "object",
-  "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration",
+  "javaType" : "org.apache.streams.example.ElasticsearchHdfsConfiguration",
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
     "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
index 8b77225..4239279 100644
--- a/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
+++ b/local/elasticsearch-hdfs/src/main/jsonschema/HdfsElasticsearchConfiguration.json
@@ -4,7 +4,7 @@
     "http://www.apache.org/licenses/LICENSE-2.0"
   ],
   "type": "object",
-  "javaType" : "org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration",
+  "javaType" : "org.apache.streams.example.HdfsElasticsearchConfiguration",
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
     "source": { "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration", "type": "object", "required": true },

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
deleted file mode 100644
index 8dfe244..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchHdfsIT.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between hdfs and elasticsearch
- */
-public class ElasticsearchHdfsIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchHdfsConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
-        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        assertNotEquals(count, 0);
-    }
-
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
-
-        ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
-
-        backup.run();
-
-        // assert lines in file
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
deleted file mode 100644
index ab882c8..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/ExampleITs.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.streams.example.elasticsearch.test;
-
-import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-        ElasticsearchPersistWriterIT.class,
-        ElasticsearchHdfsIT.class,
-        HdfsElasticsearchIT.class,
-})
-
-public class ExampleITs {
-    // the class remains empty,
-    // used only as a holder for the above annotations
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
deleted file mode 100644
index 1a055f6..0000000
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/elasticsearch/test/HdfsElasticsearchIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
-import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfs;
-import org.apache.streams.elasticsearch.example.ElasticsearchHdfsConfiguration;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearch;
-import org.apache.streams.elasticsearch.example.HdfsElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between hdfs and elasticsearch
- */
-public class HdfsElasticsearchIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected HdfsElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
-    }
-
-    @Test
-    public void ElasticsearchHdfsIT() throws Exception {
-
-        HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
-
-        restore.run();
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        assertEquals(89, countResponse.getHits().getTotalHits());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
new file mode 100644
index 0000000..8e87f3a
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.ElasticsearchHdfs;
+import org.apache.streams.example.ElasticsearchHdfsConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+public class ElasticsearchHdfsIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchHdfsConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+                .setTypes(testConfiguration.getSource().getTypes().get(0));
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
+    }
+
+    @Test
+    public void ElasticsearchHdfsIT() throws Exception {
+
+        ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+
+        backup.run();
+
+        // assert lines in file
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
new file mode 100644
index 0000000..5965914
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
@@ -0,0 +1,17 @@
+package org.apache.streams.example.test;
+
+import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+        ElasticsearchPersistWriterIT.class,
+        ElasticsearchHdfsIT.class,
+        HdfsElasticsearchIT.class,
+})
+
+public class ExampleITs {
+    // the class remains empty,
+    // used only as a holder for the above annotations
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
new file mode 100644
index 0000000..4eb7fc0
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.HdfsElasticsearch;
+import org.apache.streams.example.HdfsElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying documents between hdfs and elasticsearch
+ */
+public class HdfsElasticsearchIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected HdfsElasticsearchConfiguration testConfiguration;
+    protected Client testClient;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        if(indicesExistsResponse.isExists()) {
+            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+            assertTrue(deleteIndexResponse.isAcknowledged());
+        };
+    }
+
+    @Test
+    public void ElasticsearchHdfsIT() throws Exception {
+
+        HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+
+        restore.run();
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getDestination().getIndex())
+                .setTypes(testConfiguration.getDestination().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        assertEquals(89, countResponse.getHits().getTotalHits());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
deleted file mode 100644
index dc94773..0000000
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch.example;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.*;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- * Copies documents into a new index
- */
-public class ElasticsearchReindex implements Runnable {
-
-    public final static String STREAMS_ID = "ElasticsearchReindex";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
-
-    ElasticsearchReindexConfiguration config;
-
-    public ElasticsearchReindex() {
-       this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
-    }
-
-    public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
-        this.config = reindex;
-    }
-
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
-
-        ElasticsearchReindex reindex = new ElasticsearchReindex();
-
-        new Thread(reindex).start();
-
-    }
-
-    @Override
-    public void run() {
-
-        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
-
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
-
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
-
-        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
-        builder.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
new file mode 100644
index 0000000..dfb2a98
--- /dev/null
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example;
+
+import com.google.common.collect.Maps;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Copies documents into a new index
+ */
+public class ElasticsearchReindex implements Runnable {
+
+    public final static String STREAMS_ID = "ElasticsearchReindex";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+
+    ElasticsearchReindexConfiguration config;
+
+    public ElasticsearchReindex() {
+       this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+        this.config = reindex;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        ElasticsearchReindex reindex = new ElasticsearchReindex();
+
+        new Thread(reindex).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+
+        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+
+        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader);
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+        builder.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
index 1237538..09bdf5b 100644
--- a/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
+++ b/local/elasticsearch-reindex/src/main/jsonschema/ElasticsearchReindexConfiguration.json
@@ -4,7 +4,7 @@
     "http://www.apache.org/licenses/LICENSE-2.0"
   ],
   "type": "object",
-  "javaType" : "org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration",
+  "javaType" : "org.apache.streams.example.ElasticsearchReindexConfiguration",
   "javaInterfaces": ["java.io.Serializable"],
   "properties": {
     "source": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration", "type": "object", "required": true },

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
deleted file mode 100644
index d033014..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexChildIT.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying parent/child associated documents between two indexes on same cluster
- */
-public class ElasticsearchReindexChildIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        assertNotEquals(count, 0);
-
-    }
-
-    @Test
-    public void testReindex() throws Exception {
-
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
-        reindex.run();
-
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
deleted file mode 100644
index 5854ac0..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying documents between two indexes on same cluster
- */
-public class ElasticsearchReindexIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        assertNotEquals(count, 0);
-
-    }
-
-    @Test
-    public void testReindex() throws Exception {
-
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
-        reindex.run();
-
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
deleted file mode 100644
index 90924e7..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ElasticsearchReindexParentIT.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.elasticsearch.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
-import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
-import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Properties;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-/**
- * Test copying parent/child associated documents between two indexes on same cluster
- */
-public class ElasticsearchReindexParentIT {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
-
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchReindexConfiguration testConfiguration;
-    protected Client testClient;
-
-    private int count = 0;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Properties es_properties  = new Properties();
-        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
-        es_properties.load(es_stream);
-        Config esProps  = ConfigFactory.parseProperties(es_properties);
-        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
-        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
-        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
-        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
-                .setTypes(testConfiguration.getSource().getTypes().get(0));
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        count = (int)countResponse.getHits().getTotalHits();
-
-        PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
-        URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
-        ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
-        String templateSource = MAPPER.writeValueAsString(template);
-        putTemplateRequestBuilder.setSource(templateSource);
-
-        testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
-
-        assertNotEquals(count, 0);
-
-    }
-
-    @Test
-    public void testReindex() throws Exception {
-
-        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
-
-        reindex.run();
-
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
-
-        assertEquals(count, (int)countResponse.getHits().getTotalHits());
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
deleted file mode 100644
index 9c46d31..0000000
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/elasticsearch/test/ReindexITs.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.streams.example.elasticsearch.test;
-
-import org.apache.streams.elasticsearch.test.ElasticsearchParentChildWriterIT;
-import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-        ElasticsearchPersistWriterIT.class,
-        ElasticsearchParentChildWriterIT.class,
-        ElasticsearchReindexIT.class,
-        ElasticsearchReindexParentIT.class,
-        ElasticsearchReindexChildIT.class
-})
-
-public class ReindexITs {
-    // the class remains empty,
-    // used only as a holder for the above annotations
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/5b96588c/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
new file mode 100644
index 0000000..47c8f51
--- /dev/null
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.example.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.example.ElasticsearchReindex;
+import org.apache.streams.example.ElasticsearchReindexConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test copying parent/child associated documents between two indexes on same cluster
+ */
+public class ElasticsearchReindexChildIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    protected ElasticsearchReindexConfiguration testConfiguration;
+    protected Client testClient;
+
+    private int count = 0;
+
+    @Before
+    public void prepareTest() throws Exception {
+
+        Config reference  = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+        Properties es_properties  = new Properties();
+        InputStream es_stream  = new FileInputStream("elasticsearch.properties");
+        es_properties.load(es_stream);
+        Config esProps  = ConfigFactory.parseProperties(es_properties);
+        Config typesafe  = testResourceConfig.withFallback(esProps).withFallback(reference).resolve();
+        StreamsConfiguration streams  = StreamsConfigurator.detectConfiguration(typesafe);
+        testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+        testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+
+        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+
+        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+        assertTrue(indicesExistsResponse.isExists());
+
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+                .setTypes(testConfiguration.getSource().getTypes().get(0));
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        count = (int)countResponse.getHits().getTotalHits();
+
+        assertNotEquals(count, 0);
+
+    }
+
+    @Test
+    public void testReindex() throws Exception {
+
+        ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+
+        reindex.run();
+
+        // assert lines in file
+        SearchRequestBuilder countRequest = testClient
+                .prepareSearch(testConfiguration.getDestination().getIndex())
+                .setTypes(testConfiguration.getDestination().getType());
+        SearchResponse countResponse = countRequest.execute().actionGet();
+
+        assertEquals(count, (int)countResponse.getHits().getTotalHits());
+
+    }
+
+}