You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:52 UTC

[02/17] git commit: updater should work without metadata general cleanup of configurator

updater should work without metadata
general cleanup of configurator


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1

Branch: refs/heads/STREAMS-170
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 41 ++++---------
 .../ElasticsearchPersistUpdater.java            | 62 ++++++++------------
 2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
  */
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
     private final static ObjectMapper mapper = new ObjectMapper();
 
     public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
-        List<String> hosts = elasticsearch.getStringList("hosts");
-        Long port = elasticsearch.getLong("port");
-        String clusterName = elasticsearch.getString("clusterName");
 
-        ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+        ElasticsearchConfiguration elasticsearchConfiguration = null;
 
-        elasticsearchConfiguration.setHosts(hosts);
-        elasticsearchConfiguration.setPort(port);
-        elasticsearchConfiguration.setClusterName(clusterName);
+        try {
+            elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
+        }
 
         return elasticsearchConfiguration;
     }
 
     public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
 
-        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
-        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
 
-        List<String> indexes = elasticsearch.getStringList("indexes");
-        List<String> types = elasticsearch.getStringList("types");
-
-        elasticsearchReaderConfiguration.setIndexes(indexes);
-        elasticsearchReaderConfiguration.setTypes(types);
-
-        if( elasticsearch.hasPath("_search") ) {
-            LOGGER.info("_search supplied by config");
-            Config searchConfig = elasticsearch.getConfig("_search");
-            try {
-                elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Could not parse _search supplied by config");
-            }
+        try {
+            elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
         }
 
         return elasticsearchReaderConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
 //import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         Preconditions.checkNotNull(streamsDatum.getMetadata());
         Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
 
+        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
         String index;
         String type;
         String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
             json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
-            index = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("index"))
-                    .or(config.getIndex());
-            type = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("type"))
-                    .or(config.getType());
-            id = (String) streamsDatum.getMetadata().get("id");
+            index = (String) streamsDatum.getMetadata().get("index");
+            type = (String) streamsDatum.getMetadata().get("type");
+            id = setId(streamsDatum);
 
-            update(index, type, id, json);
+            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                index = config.getIndex();
+            }
+            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                type = config.getType();
+            }
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+            LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 
+            update(index, type, id, json);
+
+        } catch (Exception e) {
+            LOGGER.warn("Exception: {} ", e.getMessage());
+        } catch (Error e) {
+            LOGGER.warn("Error: {} ", e.getMessage());
         }
     }
 
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
                 .id(id)
                 .doc(json);
 
+        // add fields
+        updateRequest.docAsUpsert(true);
+
         add(updateRequest);
 
     }