You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:52 UTC
[02/17] git commit: updater should work without metadata general
cleanup of configurator
updater should work without metadata
general cleanup of configurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1
Branch: refs/heads/STREAMS-170
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 41 ++++---------
.../ElasticsearchPersistUpdater.java | 62 ++++++++------------
2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
/**
* Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
*/
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
private final static ObjectMapper mapper = new ObjectMapper();
public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
- List<String> hosts = elasticsearch.getStringList("hosts");
- Long port = elasticsearch.getLong("port");
- String clusterName = elasticsearch.getString("clusterName");
- ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+ ElasticsearchConfiguration elasticsearchConfiguration = null;
- elasticsearchConfiguration.setHosts(hosts);
- elasticsearchConfiguration.setPort(port);
- elasticsearchConfiguration.setClusterName(clusterName);
+ try {
+ elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
+ }
return elasticsearchConfiguration;
}
public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
- ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
- ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+ ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
- List<String> indexes = elasticsearch.getStringList("indexes");
- List<String> types = elasticsearch.getStringList("types");
-
- elasticsearchReaderConfiguration.setIndexes(indexes);
- elasticsearchReaderConfiguration.setTypes(types);
-
- if( elasticsearch.hasPath("_search") ) {
- LOGGER.info("_search supplied by config");
- Config searchConfig = elasticsearch.getConfig("_search");
- try {
- elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
- } catch (IOException e) {
- e.printStackTrace();
- LOGGER.warn("Could not parse _search supplied by config");
- }
+ try {
+ elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
}
return elasticsearchReaderConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
Preconditions.checkNotNull(streamsDatum.getMetadata());
Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
String index;
String type;
String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ index = (String) streamsDatum.getMetadata().get("index");
+ type = (String) streamsDatum.getMetadata().get("type");
+ id = setId(streamsDatum);
- update(index, type, id, json);
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+ LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
+ update(index, type, id, json);
+
+ } catch (Exception e) {
+ LOGGER.warn("Exception: {} ", e.getMessage());
+ } catch (Error e) {
+ LOGGER.warn("Error: {} ", e.getMessage());
}
}
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
.id(id)
.doc(json);
+ // add fields
+ updateRequest.docAsUpsert(true);
+
add(updateRequest);
}