You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:53 UTC
[03/17] git commit: changed how index, type,
id get set to allow for absent metadata
changed how index, type, id get set to allow for absent metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428
Branch: refs/heads/STREAMS-170
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 48 ++++++++++++--------
.../ElasticsearchPersistUpdater.java | 46 ++++++++++---------
.../ElasticsearchPersistWriter.java | 17 -------
3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ LOGGER.debug("Update Metadata: {}", metadata);
- delete(index, type, id);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) streamsDatum.getMetadata().get("type");
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ try {
+ delete(index, type, id);
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+ }
}
public void delete(String index, String type, String id) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
- LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
- String json;
- try {
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+ LOGGER.debug("Update Metadata: {}", metadata);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+
+ if( metadata != null && metadata.containsKey("index"))
index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
type = (String) streamsDatum.getMetadata().get("type");
- id = setId(streamsDatum);
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ String json;
+ try {
+
+ json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private String setId(StreamsDatum streamsDatum) {
- String id = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("id"))
- .orNull();
-
- if(id == null)
- id = Optional.fromNullable(streamsDatum.getId())
- .orNull();
-
- return id;
- }
-
private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
Object object = streamsDatum.getDocument();