You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:53 UTC

[03/17] git commit: changed how index, type, id get set to allow for absent metadata

changed how index, type, id get set to allow for absent metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428

Branch: refs/heads/STREAMS-170
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 48 ++++++++++++--------
 .../ElasticsearchPersistUpdater.java            | 46 ++++++++++---------
 .../ElasticsearchPersistWriter.java             | 17 -------
 3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        index = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("index"))
-                .or(config.getIndex());
-        type = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("type"))
-                .or(config.getType());
-        id = (String) streamsDatum.getMetadata().get("id");
+        LOGGER.debug("Update Metadata: {}", metadata);
 
-        delete(index, type, id);
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
 
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) streamsDatum.getMetadata().get("type");
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        try {
+            delete(index, type, id);
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+        }
     }
 
     public void delete(String index, String type, String id) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
-        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
 
         LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
-        String json;
-        try {
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+        LOGGER.debug("Update Metadata: {}", metadata);
 
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
+
+        if( metadata != null && metadata.containsKey("index"))
             index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
             type = (String) streamsDatum.getMetadata().get("type");
-            id = setId(streamsDatum);
-
-            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                index = config.getIndex();
-            }
-            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                type = config.getType();
-            }
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        String json;
+        try {
+
+            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
             LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private String setId(StreamsDatum streamsDatum) {
-        String id = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("id"))
-                .orNull();
-
-        if(id == null)
-            id = Optional.fromNullable(streamsDatum.getId())
-                    .orNull();
-
-        return id;
-    }
-
     private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
         Object object = streamsDatum.getDocument();