You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/09/18 00:01:37 UTC
[1/6] git commit: writer should work without metadata
Repository: incubator-streams
Updated Branches:
refs/heads/master 35a8fbf4b -> b4d71241b
writer should work without metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/df00e4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/df00e4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/df00e4a0
Branch: refs/heads/master
Commit: df00e4a06376bacda755618595922cd3382d121a
Parents: 3ee5175
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 11:32:53 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 11:32:53 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchPersistWriter.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/df00e4a0/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 664dd24..bfb21f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -144,9 +144,18 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
checkForBackOff();
- String index = (String) streamsDatum.getMetadata().get("index");
- String type = (String) streamsDatum.getMetadata().get("type");
- String id = setId(streamsDatum);
+ Map<String, Object> metadata = streamsDatum.getMetadata();
+
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) streamsDatum.getMetadata().get("type");
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
index = config.getIndex();
[5/6] git commit: refactored to move duplicated code to new protected
methods in ElasticsearchPersistWriter
Posted by sb...@apache.org.
refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bf87552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bf87552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bf87552
Branch: refs/heads/master
Commit: 5bf8755236ff3fe53eae7b0f131bfbf57647ba29
Parents: 9c98a45
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 13:59:20 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 13:59:20 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 20 +------
.../ElasticsearchPersistUpdater.java | 20 +------
.../ElasticsearchPersistWriter.java | 62 ++++++++++++++------
3 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 95004f5..319cece 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
LOGGER.debug("Delete Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
try {
delete(index, type, id);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index d0e8acd..872c65e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
LOGGER.debug("Update Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
String json;
try {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ca643b9..4ec3315 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -145,23 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
LOGGER.debug("Write Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
try {
add(index, type, id,
@@ -506,4 +492,46 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
}
+
+ protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+ String index = null;
+
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) metadata.get("index");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+
+ return index;
+ }
+
+ protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+ String type = null;
+
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) metadata.get("type");
+
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+
+ return type;
+ }
+
+ protected String getId(StreamsDatum datum) {
+
+ String id = datum.getId();
+
+ Map<String, Object> metadata = datum.getMetadata();
+
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) datum.getMetadata().get("id");
+
+ return id;
+ }
+
}
[6/6] git commit: Merge branch 'STREAMS-164'
Posted by sb...@apache.org.
Merge branch 'STREAMS-164'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b4d71241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b4d71241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b4d71241
Branch: refs/heads/master
Commit: b4d71241b36806ddc02ffaa90892dbf4e93f254f
Parents: 35a8fbf 5bf8755
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:44:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:44:46 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 41 ++++-------
.../ElasticsearchPersistDeleter.java | 34 +++++----
.../ElasticsearchPersistUpdater.java | 66 +++++------------
.../ElasticsearchPersistWriter.java | 76 +++++++++++++-------
4 files changed, 98 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
[3/6] git commit: changed how index, type,
id get set to allow for absent metadata
Posted by sb...@apache.org.
changed how index, type, id get set to allow for absent metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428
Branch: refs/heads/master
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 48 ++++++++++++--------
.../ElasticsearchPersistUpdater.java | 46 ++++++++++---------
.../ElasticsearchPersistWriter.java | 17 -------
3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ LOGGER.debug("Update Metadata: {}", metadata);
- delete(index, type, id);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) streamsDatum.getMetadata().get("type");
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ try {
+ delete(index, type, id);
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+ }
}
public void delete(String index, String type, String id) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
- LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
- String json;
- try {
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+ LOGGER.debug("Update Metadata: {}", metadata);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+
+ if( metadata != null && metadata.containsKey("index"))
index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
type = (String) streamsDatum.getMetadata().get("type");
- id = setId(streamsDatum);
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ String json;
+ try {
+
+ json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private String setId(StreamsDatum streamsDatum) {
- String id = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("id"))
- .orNull();
-
- if(id == null)
- id = Optional.fromNullable(streamsDatum.getId())
- .orNull();
-
- return id;
- }
-
private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
Object object = streamsDatum.getDocument();
[4/6] git commit: more consistent logging / exception handling
Posted by sb...@apache.org.
more consistent logging / exception handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9c98a450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9c98a450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9c98a450
Branch: refs/heads/master
Commit: 9c98a450115f7e136d002c2df311aeebc3f860fe
Parents: 63e2d42
Author: sblackmon <sb...@apache.org>
Authored: Fri Sep 12 16:01:39 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Fri Sep 12 16:01:39 2014 -0500
----------------------------------------------------------------------
.../streams/elasticsearch/ElasticsearchPersistDeleter.java | 6 +++---
.../streams/elasticsearch/ElasticsearchPersistUpdater.java | 6 ++----
.../streams/elasticsearch/ElasticsearchPersistWriter.java | 4 ++++
3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 9bb585c..95004f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -45,11 +45,11 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
if(streamsDatum == null || streamsDatum.getDocument() == null)
return;
- LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+ LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
Map<String, Object> metadata = streamsDatum.getMetadata();
- LOGGER.debug("Update Metadata: {}", metadata);
+ LOGGER.debug("Delete Metadata: {}", metadata);
String index = null;
String type = null;
@@ -72,7 +72,7 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
try {
delete(index, type, id);
} catch (Throwable e) {
- LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+ LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b8584e5..d0e8acd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -78,10 +78,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
update(index, type, id, json);
- } catch (Exception e) {
- LOGGER.warn("Exception: {} ", e.getMessage());
- } catch (Error e) {
- LOGGER.warn("Error: {} ", e.getMessage());
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 169c941..ca643b9 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -139,8 +139,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
checkForBackOff();
+ LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
+
Map<String, Object> metadata = streamsDatum.getMetadata();
+ LOGGER.debug("Write Metadata: {}", metadata);
+
String index = null;
String type = null;
String id = streamsDatum.getId();
[2/6] git commit: updater should work without metadata general
cleanup of configurator
Posted by sb...@apache.org.
updater should work without metadata
general cleanup of configurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1
Branch: refs/heads/master
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 41 ++++---------
.../ElasticsearchPersistUpdater.java | 62 ++++++++------------
2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
/**
* Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
*/
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
private final static ObjectMapper mapper = new ObjectMapper();
public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
- List<String> hosts = elasticsearch.getStringList("hosts");
- Long port = elasticsearch.getLong("port");
- String clusterName = elasticsearch.getString("clusterName");
- ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+ ElasticsearchConfiguration elasticsearchConfiguration = null;
- elasticsearchConfiguration.setHosts(hosts);
- elasticsearchConfiguration.setPort(port);
- elasticsearchConfiguration.setClusterName(clusterName);
+ try {
+ elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
+ }
return elasticsearchConfiguration;
}
public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
- ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
- ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+ ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
- List<String> indexes = elasticsearch.getStringList("indexes");
- List<String> types = elasticsearch.getStringList("types");
-
- elasticsearchReaderConfiguration.setIndexes(indexes);
- elasticsearchReaderConfiguration.setTypes(types);
-
- if( elasticsearch.hasPath("_search") ) {
- LOGGER.info("_search supplied by config");
- Config searchConfig = elasticsearch.getConfig("_search");
- try {
- elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
- } catch (IOException e) {
- e.printStackTrace();
- LOGGER.warn("Could not parse _search supplied by config");
- }
+ try {
+ elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
}
return elasticsearchReaderConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
Preconditions.checkNotNull(streamsDatum.getMetadata());
Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
String index;
String type;
String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ index = (String) streamsDatum.getMetadata().get("index");
+ type = (String) streamsDatum.getMetadata().get("type");
+ id = setId(streamsDatum);
- update(index, type, id, json);
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+ LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
+ update(index, type, id, json);
+
+ } catch (Exception e) {
+ LOGGER.warn("Exception: {} ", e.getMessage());
+ } catch (Error e) {
+ LOGGER.warn("Error: {} ", e.getMessage());
}
}
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
.id(id)
.doc(json);
+ // add fields
+ updateRequest.docAsUpsert(true);
+
add(updateRequest);
}