You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/09/18 00:01:37 UTC

[1/6] git commit: writer should work without metadata

Repository: incubator-streams
Updated Branches:
  refs/heads/master 35a8fbf4b -> b4d71241b


writer should work without metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/df00e4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/df00e4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/df00e4a0

Branch: refs/heads/master
Commit: df00e4a06376bacda755618595922cd3382d121a
Parents: 3ee5175
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 11:32:53 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 11:32:53 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchPersistWriter.java    | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/df00e4a0/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 664dd24..bfb21f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -144,9 +144,18 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         checkForBackOff();
 
-        String index = (String) streamsDatum.getMetadata().get("index");
-        String type = (String) streamsDatum.getMetadata().get("type");
-        String id = setId(streamsDatum);
+        Map<String, Object> metadata = streamsDatum.getMetadata();
+
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) streamsDatum.getMetadata().get("type");
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
 
         if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
             index = config.getIndex();


[5/6] git commit: refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter

Posted by sb...@apache.org.
refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bf87552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bf87552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bf87552

Branch: refs/heads/master
Commit: 5bf8755236ff3fe53eae7b0f131bfbf57647ba29
Parents: 9c98a45
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 13:59:20 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 13:59:20 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 20 +------
 .../ElasticsearchPersistUpdater.java            | 20 +------
 .../ElasticsearchPersistWriter.java             | 62 ++++++++++++++------
 3 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 95004f5..319cece 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
 
         LOGGER.debug("Delete Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         try {
             delete(index, type, id);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index d0e8acd..872c65e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
         LOGGER.debug("Update Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         String json;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ca643b9..4ec3315 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -145,23 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         LOGGER.debug("Write Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         try {
             add(index, type, id,
@@ -506,4 +492,46 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                 MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
                 MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
     }
+
+    protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+        String index = null;
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) metadata.get("index");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+
+        return index;
+    }
+
+    protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+        String type = null;
+
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) metadata.get("type");
+
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+
+        return type;
+    }
+
+    protected String getId(StreamsDatum datum) {
+
+        String id = datum.getId();
+
+        Map<String, Object> metadata = datum.getMetadata();
+
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) datum.getMetadata().get("id");
+
+        return id;
+    }
+
 }


[6/6] git commit: Merge branch 'STREAMS-164'

Posted by sb...@apache.org.
Merge branch 'STREAMS-164'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b4d71241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b4d71241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b4d71241

Branch: refs/heads/master
Commit: b4d71241b36806ddc02ffaa90892dbf4e93f254f
Parents: 35a8fbf 5bf8755
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:44:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:44:46 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 41 ++++-------
 .../ElasticsearchPersistDeleter.java            | 34 +++++----
 .../ElasticsearchPersistUpdater.java            | 66 +++++------------
 .../ElasticsearchPersistWriter.java             | 76 +++++++++++++-------
 4 files changed, 98 insertions(+), 119 deletions(-)
----------------------------------------------------------------------



[3/6] git commit: changed how index, type, id get set to allow for absent metadata

Posted by sb...@apache.org.
changed how index, type, id get set to allow for absent metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428

Branch: refs/heads/master
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 48 ++++++++++++--------
 .../ElasticsearchPersistUpdater.java            | 46 ++++++++++---------
 .../ElasticsearchPersistWriter.java             | 17 -------
 3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        index = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("index"))
-                .or(config.getIndex());
-        type = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("type"))
-                .or(config.getType());
-        id = (String) streamsDatum.getMetadata().get("id");
+        LOGGER.debug("Update Metadata: {}", metadata);
 
-        delete(index, type, id);
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
 
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) streamsDatum.getMetadata().get("type");
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        try {
+            delete(index, type, id);
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+        }
     }
 
     public void delete(String index, String type, String id) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
-        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
 
         LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
-        String json;
-        try {
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+        LOGGER.debug("Update Metadata: {}", metadata);
 
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
+
+        if( metadata != null && metadata.containsKey("index"))
             index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
             type = (String) streamsDatum.getMetadata().get("type");
-            id = setId(streamsDatum);
-
-            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                index = config.getIndex();
-            }
-            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                type = config.getType();
-            }
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        String json;
+        try {
+
+            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
             LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private String setId(StreamsDatum streamsDatum) {
-        String id = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("id"))
-                .orNull();
-
-        if(id == null)
-            id = Optional.fromNullable(streamsDatum.getId())
-                    .orNull();
-
-        return id;
-    }
-
     private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
         Object object = streamsDatum.getDocument();
 


[4/6] git commit: more consistent logging / exception handling

Posted by sb...@apache.org.
more consistent logging / exception handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9c98a450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9c98a450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9c98a450

Branch: refs/heads/master
Commit: 9c98a450115f7e136d002c2df311aeebc3f860fe
Parents: 63e2d42
Author: sblackmon <sb...@apache.org>
Authored: Fri Sep 12 16:01:39 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Fri Sep 12 16:01:39 2014 -0500

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchPersistDeleter.java     | 6 +++---
 .../streams/elasticsearch/ElasticsearchPersistUpdater.java     | 6 ++----
 .../streams/elasticsearch/ElasticsearchPersistWriter.java      | 4 ++++
 3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 9bb585c..95004f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -45,11 +45,11 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
         if(streamsDatum == null || streamsDatum.getDocument() == null)
             return;
 
-        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+        LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
 
         Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        LOGGER.debug("Update Metadata: {}", metadata);
+        LOGGER.debug("Delete Metadata: {}", metadata);
 
         String index = null;
         String type = null;
@@ -72,7 +72,7 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
         try {
             delete(index, type, id);
         } catch (Throwable e) {
-            LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+            LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b8584e5..d0e8acd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -78,10 +78,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
             update(index, type, id, json);
 
-        } catch (Exception e) {
-            LOGGER.warn("Exception: {} ", e.getMessage());
-        } catch (Error e) {
-            LOGGER.warn("Error: {} ", e.getMessage());
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 169c941..ca643b9 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -139,8 +139,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         checkForBackOff();
 
+        LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
+
         Map<String, Object> metadata = streamsDatum.getMetadata();
 
+        LOGGER.debug("Write Metadata: {}", metadata);
+
         String index = null;
         String type = null;
         String id = streamsDatum.getId();


[2/6] git commit: updater should work without metadata general cleanup of configurator

Posted by sb...@apache.org.
updater should work without metadata
general cleanup of configurator


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1

Branch: refs/heads/master
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 41 ++++---------
 .../ElasticsearchPersistUpdater.java            | 62 ++++++++------------
 2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
  */
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
     private final static ObjectMapper mapper = new ObjectMapper();
 
     public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
-        List<String> hosts = elasticsearch.getStringList("hosts");
-        Long port = elasticsearch.getLong("port");
-        String clusterName = elasticsearch.getString("clusterName");
 
-        ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+        ElasticsearchConfiguration elasticsearchConfiguration = null;
 
-        elasticsearchConfiguration.setHosts(hosts);
-        elasticsearchConfiguration.setPort(port);
-        elasticsearchConfiguration.setClusterName(clusterName);
+        try {
+            elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
+        }
 
         return elasticsearchConfiguration;
     }
 
     public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
 
-        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
-        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
 
-        List<String> indexes = elasticsearch.getStringList("indexes");
-        List<String> types = elasticsearch.getStringList("types");
-
-        elasticsearchReaderConfiguration.setIndexes(indexes);
-        elasticsearchReaderConfiguration.setTypes(types);
-
-        if( elasticsearch.hasPath("_search") ) {
-            LOGGER.info("_search supplied by config");
-            Config searchConfig = elasticsearch.getConfig("_search");
-            try {
-                elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Could not parse _search supplied by config");
-            }
+        try {
+            elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
         }
 
         return elasticsearchReaderConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
 //import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         Preconditions.checkNotNull(streamsDatum.getMetadata());
         Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
 
+        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
         String index;
         String type;
         String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
             json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
-            index = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("index"))
-                    .or(config.getIndex());
-            type = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("type"))
-                    .or(config.getType());
-            id = (String) streamsDatum.getMetadata().get("id");
+            index = (String) streamsDatum.getMetadata().get("index");
+            type = (String) streamsDatum.getMetadata().get("type");
+            id = setId(streamsDatum);
 
-            update(index, type, id, json);
+            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                index = config.getIndex();
+            }
+            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                type = config.getType();
+            }
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+            LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 
+            update(index, type, id, json);
+
+        } catch (Exception e) {
+            LOGGER.warn("Exception: {} ", e.getMessage());
+        } catch (Error e) {
+            LOGGER.warn("Error: {} ", e.getMessage());
         }
     }
 
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
                 .id(id)
                 .doc(json);
 
+        // add fields
+        updateRequest.docAsUpsert(true);
+
         add(updateRequest);
 
     }