You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/02/03 00:36:08 UTC

[3/5] nifi git commit: Added path home property and unit test to elasticsearch processor in support of the node client

Added path home property and unit test to elasticsearch processor in support of the node client


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0c137bc2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0c137bc2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0c137bc2

Branch: refs/heads/master
Commit: 0c137bc22d18b23b121e42de0dd03297fffa2b5f
Parents: 943d0a6
Author: scarpacci <mi...@gmail.com>
Authored: Wed Dec 30 09:27:49 2015 -0800
Committer: Matt Burgess <ma...@gmail.com>
Committed: Tue Feb 2 17:26:40 2016 -0500

----------------------------------------------------------------------
 .../AbstractElasticsearchProcessor.java         | 32 ++++++-
 .../elasticsearch/PutElasticsearch.java         |  1 +
 .../elasticsearch/TestPutElasticsearch.java     | 90 ++++++++++++++++++--
 3 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
index 9573178..71a116b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java
@@ -81,6 +81,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .required(false)
             .addValidator(new ElasticsearchClientValidator())
             .build();
+
+    protected static final PropertyDescriptor PATH_HOME = new PropertyDescriptor.Builder()
+            .name("ElasticSearch Path Home")
+            .description("ElasticSearch node client requires that path.home be set. For example, "
+                        + "/usr/share/elasticsearch or /usr/local/opt/elasticsearch for homebrew intall "
+                        + "https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-dir-layout.html")
+            .required(false)
+            .addValidator(new ElasticsearchClientValidator())
+            .build();
+
     protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
             .name("ElasticSearch Ping Timeout")
             .description("The ping timeout used to determine when a node is unreachable.  " +
@@ -89,6 +99,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
             .defaultValue("5s")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
     protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
             .name("Sampler Interval")
             .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
@@ -144,7 +155,14 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
                 }
                 esClient = transportClient;
             } else if ("node".equals(clusterType)) {
-                esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).node().client();
+
+                final String pathHome = context.getProperty(PATH_HOME).toString();
+                //create new node client
+                Settings settings = Settings.settingsBuilder()
+                        .put("path.home", pathHome)
+                        .build();
+
+                esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).settings(settings).node().client();
             }
         } catch (Exception e) {
             log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
@@ -205,7 +223,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
     }
 
     /**
-     * A custom validator for the Elasticsearch properties list. For example, the hostnames property doesn't need to
+     * A custom validator for the ElasticSearch properties list. For example, the hostnames property doesn't need to
      * be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client
      */
     protected static class ElasticsearchClientValidator implements Validator {
@@ -220,6 +238,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
                             CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
                 }
             }
+
+            // Only validate Path home if client type == Node
+            if (PATH_HOME.getName().equals(subject)) {
+                PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE);
+                if (NODE_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
+                    return StandardValidators.NON_EMPTY_VALIDATOR.validate(
+                            CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
+                }
+            }
+
             return VALID.validate(subject, input, context);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
index 003aa30..43f3a6b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java
@@ -116,6 +116,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
         descriptors.add(CLIENT_TYPE);
         descriptors.add(CLUSTER_NAME);
         descriptors.add(HOSTS);
+        descriptors.add(PATH_HOME);
         descriptors.add(PING_TIMEOUT);
         descriptors.add(SAMPLER_INTERVAL);
         descriptors.add(ID_ATTRIBUTE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c137bc2/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
index 68a0a78..6af8fd2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java
@@ -21,8 +21,6 @@ import com.google.gson.JsonParser;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.MockProcessorInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.elasticsearch.action.ActionListener;
@@ -32,11 +30,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.AbstractListenableActionFuture;
 import org.elasticsearch.action.support.AdapterActionFuture;
-import org.elasticsearch.action.support.PlainListenableActionFuture;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -125,6 +120,35 @@ public class TestPutElasticsearch {
         out.assertAttributeEquals("tweet_id", "28039652140");
     }
 
+    @Test
+    public void testPutElasticSearchOnTriggerNode() throws IOException {
+        runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
+        runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
+        runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
+
+        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+        runner.assertNotValid();
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652141");
+        }});
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        out.assertAttributeEquals("tweet_id", "28039652141");
+    }
+
     /**
      * A Test class that extends the processor in order to inject/mock behavior
      */
@@ -197,14 +221,56 @@ public class TestPutElasticsearch {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
         runner.setValidateExpressionUsage(false);
+
         //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
         runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
+
+        runner.setProperty(PutElasticsearch.INDEX, "tweet");
+        runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
+
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652140");
+        }});
+
+
+        runner.enqueue(twitterExample);
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
+
+    }
+
+    @Test
+    @Ignore("Comment this out if you want to run against local or test ES")
+    public void testPutElasticSearchBasicNode() throws IOException {
+        System.out.println("Starting test " + new Object() {
+        }.getClass().getEnclosingMethod().getName());
+        final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
+        runner.setValidateExpressionUsage(false);
+
+        //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node");
+        runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
+        runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, "/usr/local/opt/elasticsearch");
         runner.setProperty(PutElasticsearch.INDEX, "tweet");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
 
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+        runner.enqueue(twitterExample, new HashMap<String, String>() {{
+            put("tweet_id", "28039652141");
+        }});
 
         runner.enqueue(twitterExample);
         runner.run(1, true, true);
@@ -221,7 +287,9 @@ public class TestPutElasticsearch {
         }.getClass().getEnclosingMethod().getName());
         final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
         runner.setValidateExpressionUsage(false);
+
         //Local Cluster - Mac pulled from brew
+        runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
         runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
         runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
         runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
@@ -229,6 +297,11 @@ public class TestPutElasticsearch {
         runner.setProperty(PutElasticsearch.INDEX, "tweet");
         runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
 
+        runner.setProperty(PutElasticsearch.TYPE, "status");
+        runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
+        runner.assertValid();
+
+
         JsonParser parser = new JsonParser();
         JsonObject json;
         String message = convertStreamToString(twitterExample);
@@ -237,8 +310,11 @@ public class TestPutElasticsearch {
             json = parser.parse(message).getAsJsonObject();
             String id = json.get("id").getAsString();
             long newId = Long.parseLong(id) + i;
-            json.addProperty("id", newId);
-            runner.enqueue(message.getBytes());
+            final String newStrId = Long.toString(newId);
+            //json.addProperty("id", newId);
+            runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
+                put("tweet_id", newStrId);
+            }});
 
         }