You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:51 UTC
[01/17] git commit: writer should work without metadata
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-170 933060e42 -> 6cd97b8e8
writer should work without metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/df00e4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/df00e4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/df00e4a0
Branch: refs/heads/STREAMS-170
Commit: df00e4a06376bacda755618595922cd3382d121a
Parents: 3ee5175
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 11:32:53 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 11:32:53 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchPersistWriter.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/df00e4a0/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 664dd24..bfb21f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -144,9 +144,18 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
checkForBackOff();
- String index = (String) streamsDatum.getMetadata().get("index");
- String type = (String) streamsDatum.getMetadata().get("type");
- String id = setId(streamsDatum);
+ Map<String, Object> metadata = streamsDatum.getMetadata();
+
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) streamsDatum.getMetadata().get("type");
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
index = config.getIndex();
[14/17] git commit: Utility Processors to - populate datum from
metadata - populate datum from metadata in document field
Posted by sb...@apache.org.
Utility Processors to
- populate datum from metadata
- populate datum from metadata in document field
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aef8860f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aef8860f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aef8860f
Branch: refs/heads/STREAMS-170
Commit: aef8860f95002045322265e4834331b57643a50f
Parents: 8c45159
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:51:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:08 2014 -0500
----------------------------------------------------------------------
.../DatumFromMetadataAsDocumentProcessor.java | 128 +++++++++++++++++++
.../processor/DatumFromMetadataProcessor.java | 91 +++++++++++++
2 files changed, 219 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aef8860f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
new file mode 100644
index 0000000..cfad87e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -0,0 +1,128 @@
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 9/4/14.
+ */
+public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable {
+
+ public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+ private ElasticsearchClientManager elasticsearchClientManager;
+ private ElasticsearchReaderConfiguration config;
+
+ private ObjectMapper mapper;
+
+ public DatumFromMetadataAsDocumentProcessor() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DatumFromMetadataAsDocumentProcessor(Config config) {
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ ObjectNode metadataObjectNode;
+ try {
+ metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
+ } catch (IOException e) {
+ return result;
+ }
+
+ Map<String, Object> metadata = asMap(metadataObjectNode);
+
+ if(entry == null || entry.getMetadata() == null)
+ return result;
+
+ String index = (String) metadata.get("index");
+ String type = (String) metadata.get("type");
+ String id = (String) metadata.get("id");
+
+ if( index == null ) {
+ index = this.config.getIndexes().get(0);
+ }
+ if( type == null ) {
+ type = this.config.getTypes().get(0);
+ }
+ if( id == null ) {
+ id = entry.getId();
+ }
+
+ GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id);
+ getRequestBuilder.setFields("*", "_timestamp");
+ getRequestBuilder.setFetchSource(true);
+ GetResponse getResponse = getRequestBuilder.get();
+
+ if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true )
+ return result;
+
+ entry.setDocument(getResponse.getSource());
+ if( getResponse.getField("_timestamp") != null) {
+ DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
+ entry.setTimestamp(timestamp);
+ }
+
+ result.add(entry);
+
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+ mapper = StreamsJacksonMapper.getInstance();
+ mapper.registerModule(new JsonOrgModule());
+ }
+
+ @Override
+ public void cleanUp() {
+ this.elasticsearchClientManager.getClient().close();
+ }
+
+ public Map<String, Object> asMap(JsonNode node) {
+
+ Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+ Map<String, Object> ret = Maps.newHashMap();
+
+ Map.Entry<String, JsonNode> entry;
+
+ while (iterator.hasNext()) {
+ entry = iterator.next();
+ if( entry.getValue().asText() != null )
+ ret.put(entry.getKey(), entry.getValue().asText());
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aef8860f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
new file mode 100644
index 0000000..170749d
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -0,0 +1,91 @@
+package org.apache.streams.elasticsearch.processor;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 9/4/14.
+ */
+public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable {
+
+ public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+ private ElasticsearchClientManager elasticsearchClientManager;
+ private ElasticsearchReaderConfiguration config;
+
+ public DatumFromMetadataProcessor() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DatumFromMetadataProcessor(Config config) {
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ if(entry == null || entry.getMetadata() == null)
+ return result;
+
+ String index = (String) entry.getMetadata().get("index");
+ String type = (String) entry.getMetadata().get("type");
+ String id = (String) entry.getMetadata().get("id");
+
+ if( index == null ) {
+ index = this.config.getIndexes().get(0);
+ }
+ if( type == null ) {
+ type = this.config.getTypes().get(0);
+ }
+ if( id == null ) {
+ id = entry.getId();
+ }
+
+ GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id);
+ getRequestBuilder.setFields("*", "_timestamp");
+ getRequestBuilder.setFetchSource(true);
+ GetResponse getResponse = getRequestBuilder.get();
+
+ if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true )
+ return result;
+
+ entry.setDocument(getResponse.getSource());
+ if( getResponse.getField("_timestamp") != null) {
+ DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
+ entry.setTimestamp(timestamp);
+ }
+
+ result.add(entry);
+
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+
+ }
+
+ @Override
+ public void cleanUp() {
+ this.elasticsearchClientManager.getClient().close();
+ }
+}
[15/17] git commit: license / javadoc headers
Posted by sb...@apache.org.
license / javadoc headers
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bfd1bc1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bfd1bc1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bfd1bc1d
Branch: refs/heads/STREAMS-170
Commit: bfd1bc1d1f7c58f001c6aeb7fa430e7a70f299ed
Parents: aef8860
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Sep 16 11:22:36 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:08 2014 -0500
----------------------------------------------------------------------
.../DatumFromMetadataAsDocumentProcessor.java | 20 +++++++++++++++++++-
.../processor/DatumFromMetadataProcessor.java | 20 +++++++++++++++++++-
2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfd1bc1d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index cfad87e..5e58bb0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.elasticsearch.processor;
import com.fasterxml.jackson.databind.JsonNode;
@@ -25,7 +43,7 @@ import java.util.List;
import java.util.Map;
/**
- * Created by sblackmon on 9/4/14.
+ * Uses index and type in metadata map stored in datum document to populate current document into datums
*/
public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfd1bc1d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index 170749d..a6e0838 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.elasticsearch.processor;
import com.google.common.collect.Lists;
@@ -16,7 +34,7 @@ import java.io.Serializable;
import java.util.List;
/**
- * Created by sblackmon on 9/4/14.
+ * Uses index and type in metadata to populate current document into datums
*/
public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable {
[06/17] git commit: Merge branch 'STREAMS-164'
Posted by sb...@apache.org.
Merge branch 'STREAMS-164'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b4d71241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b4d71241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b4d71241
Branch: refs/heads/STREAMS-170
Commit: b4d71241b36806ddc02ffaa90892dbf4e93f254f
Parents: 35a8fbf 5bf8755
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:44:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:44:46 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 41 ++++-------
.../ElasticsearchPersistDeleter.java | 34 +++++----
.../ElasticsearchPersistUpdater.java | 66 +++++------------
.../ElasticsearchPersistWriter.java | 76 +++++++++++++-------
4 files changed, 98 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
[11/17] git commit: STREAMS-180 | InstagramTypeConverter is now able
to properly serialize UserDataItem objects into Activities.
Posted by sb...@apache.org.
STREAMS-180 | InstagramTypeConverter is now able to properly serialize UserDataItem objects into Activities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/119608d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/119608d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/119608d2
Branch: refs/heads/STREAMS-170
Commit: 119608d2f196b24cfba4db8aaab19022a7131cf5
Parents: 2397f4b
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 16:49:19 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 16:49:19 2014 -0500
----------------------------------------------------------------------
.../processor/InstagramTypeConverter.java | 5 +-
.../serializer/util/InstagramActivityUtil.java | 46 ++++++++++++++++-
.../test/InstagramActivitySerDeTest.java | 53 +++++++++++++++++++-
.../src/test/resources/testUserInfoData.txt | 2 +
4 files changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 90ec3a8..f0101fd 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -78,9 +78,10 @@ public class InstagramTypeConverter implements StreamsProcessor {
} else if(item instanceof UserInfoData) {
- activity = this.userInfoSerializer.deserialize((UserInfoData) item );
+ activity = new Activity();
+ instagramActivityUtil.updateActivity((UserInfoData) item, activity);
}
- if(activity != null && activity.getId() != null) {
+ if(activity != null) {
result = new StreamsDatum(activity);
count++;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 499d0e7..c210be8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -26,6 +26,8 @@ import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.json.*;
import org.jinstagram.entity.comments.CommentData;
import org.jinstagram.entity.common.*;
+import org.jinstagram.entity.users.basicinfo.Counts;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -72,11 +74,53 @@ public class InstagramActivityUtil {
}
/**
+ * Updates the given Activity object with the values from the item
+ * @param item the object to use as the source
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @throws ActivitySerializerException
+ */
+ public static void updateActivity(UserInfoData item, Activity activity) throws ActivitySerializerException {
+ activity.setActor(buildActor(item));
+ activity.setId(null);
+ activity.setProvider(getProvider());
+ }
+
+ public static Actor buildActor(UserInfoData item) {
+ Actor actor = new Actor();
+
+ try {
+ Image image = new Image();
+ image.setUrl(item.getProfile_picture());
+
+ Counts counts = item.getCounts();
+
+ Map<String, Object> extensions = new HashMap<String, Object>();
+
+ extensions.put("followers", counts.getFollwed_by());
+ extensions.put("follows", counts.getFollows());
+ extensions.put("screenName", item.getUsername());
+
+ actor.setId(formatId(String.valueOf(item.getId())));
+ actor.setImage(image);
+ actor.setDisplayName(item.getFullName());
+ actor.setSummary(item.getBio());
+ actor.setUrl(item.getWebsite());
+
+ actor.setAdditionalProperty("handle", item.getUsername());
+ actor.setAdditionalProperty("extensions", extensions);
+ } catch (Exception e) {
+ LOGGER.error("Exception trying to build actor object: {}", e.getMessage());
+ }
+
+ return actor;
+ }
+
+ /**
* Builds the actor
* @param item the item
* @return a valid Actor
*/
- public static Actor buildActor(MediaFeedData item) {
+ public static Actor buildActor(MediaFeedData item) {
Actor actor = new Actor();
try {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
index 075da80..de5e466 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
@@ -24,6 +24,7 @@ import org.apache.streams.instagram.serializer.util.InstagramDeserializer;
import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.junit.Assert;
import org.junit.Test;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.Map;
import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
import static org.hamcrest.CoreMatchers.*;
@@ -50,7 +52,7 @@ public class InstagramActivitySerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
@Test
- public void Tests() {
+ public void TestMediaFeedObjects() {
InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/testMediaFeedObjects.txt");
InputStreamReader isr = new InputStreamReader(is);
@@ -85,4 +87,53 @@ public class InstagramActivitySerDeTest {
Assert.fail();
}
}
+
+ @Test
+ public void TestUserInfoData() {
+ InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
+ InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/testUserInfoData.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ UserInfoData userInfoData = instagramDeserializer.createObjectFromResponse(UserInfoData.class, line);
+
+ Activity activity = new Activity();
+
+ LOGGER.info("activity: {}", activity.toString());
+
+ updateActivity(userInfoData, activity);
+ assertThat(activity, is(not(nullValue())));
+
+ assertThat(activity.getId(), is(nullValue()));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getActor().getImage(), is(not(nullValue())));
+ assertThat(activity.getActor().getDisplayName(), is(not(nullValue())));
+ assertThat(activity.getActor().getSummary(), is(not(nullValue())));
+
+ Map<String, Object> extensions = (Map<String, Object>)activity.getActor().getAdditionalProperties().get("extensions");
+ assertThat(extensions, is(not(nullValue())));
+ assertThat(extensions.get("follows"), is(not(nullValue())));
+ assertThat(extensions.get("followers"), is(not(nullValue())));
+ assertThat(extensions.get("screenName"), is(not(nullValue())));
+
+ assertThat(activity.getActor().getAdditionalProperties().get("handle"), is(not(nullValue())));
+ assertThat(activity.getActor().getId(), is(not(nullValue())));
+ assertThat(activity.getActor().getUrl(), is(not(nullValue())));
+ assertThat(activity.getVerb(), is(not(nullValue())));
+ assertThat(activity.getProvider(), is(not(nullValue())));
+ }
+ }
+ } catch( Exception e ) {
+ System.out.println(e);
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt b/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
new file mode 100644
index 0000000..679de91
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
@@ -0,0 +1,2 @@
+{"bio":"This is the official Instagram account for Speaker John Boehner, a regular guy with a big job.","counts":{"follows":72,"follwed_by":4844,"media":129},"first_name":null,"id":249235106,"last_name":null,"profile_picture":"http://images.ak.instagram.com/profiles/profile_249235106_75sq_1388678585.jpg","username":"speakerboehner","full_name":"Speaker John Boehner","website":"http://speaker.gov"}
+{"bio":"","counts":{"follows":37,"follwed_by":3083,"media":70},"first_name":null,"id":44979810,"last_name":null,"profile_picture":"http://images.ak.instagram.com/profiles/profile_44979810_75sq_1335477964.jpg","username":"senjohnmccain","full_name":"John McCain","website":"http://mccain.senate.gov"}
\ No newline at end of file
[07/17] git commit: addresses failing test in streams-processor-urls
Posted by sb...@apache.org.
addresses failing test in streams-processor-urls
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc5e7de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc5e7de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc5e7de3
Branch: refs/heads/STREAMS-170
Commit: dc5e7de3c5c2e2fe905928785aaf9b059df34747
Parents: b4d7124
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:58:50 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:58:50 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/streams/urls/LinkResolver.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc5e7de3/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
index 2f7646b..5497b92 100644
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
@@ -372,6 +372,12 @@ public class LinkResolver implements Serializable {
} catch (UnsupportedEncodingException uee) {
System.err.println("Unable to Decode URL. Decoding skipped.");
uee.printStackTrace();
+ } catch (NullPointerException npe) {
+ System.err.println("NPE Decoding URL. Decoding skipped.");
+ npe.printStackTrace();
+ } catch (Throwable e) {
+ System.err.println("Misc error Decoding URL. Decoding skipped.");
+ e.printStackTrace();
}
// Remove the protocol, http:// ftp:// or similar from the front
[17/17] git commit: DocumentToMetadataProcessor utility processor
Posted by sb...@apache.org.
DocumentToMetadataProcessor utility processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6cd97b8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6cd97b8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6cd97b8e
Branch: refs/heads/STREAMS-170
Commit: 6cd97b8e8a77c4a29c3f3079223a94a4ef41b877
Parents: 576b849
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Oct 1 15:42:55 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Oct 1 15:42:55 2014 -0500
----------------------------------------------------------------------
.../DatumFromMetadataAsDocumentProcessor.java | 17 +--
.../processor/DocumentToMetadataProcessor.java | 118 +++++++++++++++++++
2 files changed, 119 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 5e58bb0..9aea4c4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -78,7 +78,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S
return result;
}
- Map<String, Object> metadata = asMap(metadataObjectNode);
+ Map<String, Object> metadata = DocumentToMetadataProcessor.asMap(metadataObjectNode);
if(entry == null || entry.getMetadata() == null)
return result;
@@ -128,19 +128,4 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S
this.elasticsearchClientManager.getClient().close();
}
- public Map<String, Object> asMap(JsonNode node) {
-
- Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
- Map<String, Object> ret = Maps.newHashMap();
-
- Map.Entry<String, JsonNode> entry;
-
- while (iterator.hasNext()) {
- entry = iterator.next();
- if( entry.getValue().asText() != null )
- ret.put(entry.getKey(), entry.getValue().asText());
- }
-
- return ret;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
new file mode 100644
index 0000000..804e1ac
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Uses index and type in metadata to populate current document into datums
+ */
+public class DocumentToMetadataProcessor implements StreamsProcessor, Serializable {
+
+ public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+ private ElasticsearchClientManager elasticsearchClientManager;
+ private ElasticsearchReaderConfiguration config;
+
+ private ObjectMapper mapper;
+
+ public DocumentToMetadataProcessor() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DocumentToMetadataProcessor(Config config) {
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+ }
+
+ public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ ObjectNode metadataObjectNode;
+ try {
+ metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
+ } catch (IOException e) {
+ return result;
+ }
+
+ Map<String, Object> metadata = asMap(metadataObjectNode);
+
+ if(entry == null || metadata == null)
+ return result;
+
+ entry.setMetadata(metadata);
+
+ result.add(entry);
+
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+ mapper = StreamsJacksonMapper.getInstance();
+ mapper.registerModule(new JsonOrgModule());
+
+ }
+
+ @Override
+ public void cleanUp() {
+ this.elasticsearchClientManager.getClient().close();
+ }
+
+ public static Map<String, Object> asMap(JsonNode node) {
+
+ Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+ Map<String, Object> ret = Maps.newHashMap();
+
+ Map.Entry<String, JsonNode> entry;
+
+ while (iterator.hasNext()) {
+ entry = iterator.next();
+ if( entry.getValue().asText() != null )
+ ret.put(entry.getKey(), entry.getValue().asText());
+ }
+
+ return ret;
+ }
+}
[13/17] git commit: STREAMS-180 | Fixing issue where NPE could be
thrown if a post has no comments
Posted by sb...@apache.org.
STREAMS-180 | Fixing issue where NPE could be thrown if a post has no comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8c45159b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8c45159b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8c45159b
Branch: refs/heads/STREAMS-170
Commit: 8c45159b0e034c533a6836bc5d23f83a02395af5
Parents: ef2e065
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 17:23:59 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 17:23:59 2014 -0500
----------------------------------------------------------------------
.../instagram/serializer/util/InstagramActivityUtil.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8c45159b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 5bbbb35..d41ea93 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -322,8 +322,11 @@ public class InstagramActivityUtil {
Comments comments = item.getComments();
String commentsConcat = "";
- for(CommentData commentData : comments.getComments()) {
- commentsConcat += " " + commentData.getText();
+
+ if(comments != null) {
+ for (CommentData commentData : comments.getComments()) {
+ commentsConcat += " " + commentData.getText();
+ }
}
if(item.getCaption() != null) {
commentsConcat += " " + item.getCaption().getText();
[10/17] git commit: STREAMS-179 | Added javdoc comments.
Posted by sb...@apache.org.
STREAMS-179 | Added javdoc comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2397f4b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2397f4b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2397f4b4
Branch: refs/heads/STREAMS-170
Commit: 2397f4b480448ebd3e1a3c4efa4d9dc5ef64750a
Parents: e091c6c
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:24:22 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:24:22 2014 -0500
----------------------------------------------------------------------
.../ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2397f4b4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index f8d6343..55a26e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
+ * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable.
* @see {@link java.util.concurrent.ThreadPoolExecutor}
*/
public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
[03/17] git commit: changed how index, type,
id get set to allow for absent metadata
Posted by sb...@apache.org.
changed how index, type, id get set to allow for absent metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428
Branch: refs/heads/STREAMS-170
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 48 ++++++++++++--------
.../ElasticsearchPersistUpdater.java | 46 ++++++++++---------
.../ElasticsearchPersistWriter.java | 17 -------
3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ LOGGER.debug("Update Metadata: {}", metadata);
- delete(index, type, id);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) streamsDatum.getMetadata().get("type");
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ try {
+ delete(index, type, id);
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+ }
}
public void delete(String index, String type, String id) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
@Override
public void write(StreamsDatum streamsDatum) {
- Preconditions.checkNotNull(streamsDatum);
- Preconditions.checkNotNull(streamsDatum.getDocument());
- Preconditions.checkNotNull(streamsDatum.getMetadata());
- Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
- LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
- String index;
- String type;
- String id;
- String json;
- try {
+ Map<String, Object> metadata = streamsDatum.getMetadata();
- json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+ LOGGER.debug("Update Metadata: {}", metadata);
+ String index = null;
+ String type = null;
+ String id = streamsDatum.getId();
+
+ if( metadata != null && metadata.containsKey("index"))
index = (String) streamsDatum.getMetadata().get("index");
+ if( metadata != null && metadata.containsKey("type"))
type = (String) streamsDatum.getMetadata().get("type");
- id = setId(streamsDatum);
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+ String json;
+ try {
+
+ json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private String setId(StreamsDatum streamsDatum) {
- String id = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("id"))
- .orNull();
-
- if(id == null)
- id = Optional.fromNullable(streamsDatum.getId())
- .orNull();
-
- return id;
- }
-
private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
Object object = streamsDatum.getDocument();
[04/17] git commit: more consistent logging / exception handling
Posted by sb...@apache.org.
more consistent logging / exception handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9c98a450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9c98a450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9c98a450
Branch: refs/heads/STREAMS-170
Commit: 9c98a450115f7e136d002c2df311aeebc3f860fe
Parents: 63e2d42
Author: sblackmon <sb...@apache.org>
Authored: Fri Sep 12 16:01:39 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Fri Sep 12 16:01:39 2014 -0500
----------------------------------------------------------------------
.../streams/elasticsearch/ElasticsearchPersistDeleter.java | 6 +++---
.../streams/elasticsearch/ElasticsearchPersistUpdater.java | 6 ++----
.../streams/elasticsearch/ElasticsearchPersistWriter.java | 4 ++++
3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 9bb585c..95004f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -45,11 +45,11 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
if(streamsDatum == null || streamsDatum.getDocument() == null)
return;
- LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+ LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
Map<String, Object> metadata = streamsDatum.getMetadata();
- LOGGER.debug("Update Metadata: {}", metadata);
+ LOGGER.debug("Delete Metadata: {}", metadata);
String index = null;
String type = null;
@@ -72,7 +72,7 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
try {
delete(index, type, id);
} catch (Throwable e) {
- LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+ LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b8584e5..d0e8acd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -78,10 +78,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
update(index, type, id, json);
- } catch (Exception e) {
- LOGGER.warn("Exception: {} ", e.getMessage());
- } catch (Error e) {
- LOGGER.warn("Error: {} ", e.getMessage());
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 169c941..ca643b9 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -139,8 +139,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
checkForBackOff();
+ LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
+
Map<String, Object> metadata = streamsDatum.getMetadata();
+ LOGGER.debug("Write Metadata: {}", metadata);
+
String index = null;
String type = null;
String id = streamsDatum.getId();
[16/17] git commit: Merge branch 'STREAMS-170' of
https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-170
Posted by sb...@apache.org.
Merge branch 'STREAMS-170' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-170
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/576b8497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/576b8497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/576b8497
Branch: refs/heads/STREAMS-170
Commit: 576b84970959bbdaa283b541d699214ecf86d2b5
Parents: bfd1bc1 933060e
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Sep 24 12:31:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:23 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/17] git commit: updater should work without metadata general
cleanup of configurator
Posted by sb...@apache.org.
updater should work without metadata
general cleanup of configurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1
Branch: refs/heads/STREAMS-170
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 41 ++++---------
.../ElasticsearchPersistUpdater.java | 62 ++++++++------------
2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
/**
* Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
*/
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
private final static ObjectMapper mapper = new ObjectMapper();
public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
- List<String> hosts = elasticsearch.getStringList("hosts");
- Long port = elasticsearch.getLong("port");
- String clusterName = elasticsearch.getString("clusterName");
- ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+ ElasticsearchConfiguration elasticsearchConfiguration = null;
- elasticsearchConfiguration.setHosts(hosts);
- elasticsearchConfiguration.setPort(port);
- elasticsearchConfiguration.setClusterName(clusterName);
+ try {
+ elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
+ }
return elasticsearchConfiguration;
}
public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
- ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
- ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+ ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
- List<String> indexes = elasticsearch.getStringList("indexes");
- List<String> types = elasticsearch.getStringList("types");
-
- elasticsearchReaderConfiguration.setIndexes(indexes);
- elasticsearchReaderConfiguration.setTypes(types);
-
- if( elasticsearch.hasPath("_search") ) {
- LOGGER.info("_search supplied by config");
- Config searchConfig = elasticsearch.getConfig("_search");
- try {
- elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
- } catch (IOException e) {
- e.printStackTrace();
- LOGGER.warn("Could not parse _search supplied by config");
- }
+ try {
+ elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchconfiguration");
}
return elasticsearchReaderConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
Preconditions.checkNotNull(streamsDatum.getMetadata());
Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+ LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
String index;
String type;
String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
- index = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("index"))
- .or(config.getIndex());
- type = Optional.fromNullable(
- (String) streamsDatum.getMetadata().get("type"))
- .or(config.getType());
- id = (String) streamsDatum.getMetadata().get("id");
+ index = (String) streamsDatum.getMetadata().get("index");
+ type = (String) streamsDatum.getMetadata().get("type");
+ id = setId(streamsDatum);
- update(index, type, id, json);
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+ LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
+ update(index, type, id, json);
+
+ } catch (Exception e) {
+ LOGGER.warn("Exception: {} ", e.getMessage());
+ } catch (Error e) {
+ LOGGER.warn("Error: {} ", e.getMessage());
}
}
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
.id(id)
.doc(json);
+ // add fields
+ updateRequest.docAsUpsert(true);
+
add(updateRequest);
}
[09/17] git commit: STREAMS-179 | Added new thread pool executor to
shutdown streams
Posted by sb...@apache.org.
STREAMS-179 | Added new thread pool executor to shutdown streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc
Branch: refs/heads/STREAMS-170
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 3 +-
...amOnUnhandleThrowableThreadPoolExecutor.java | 45 ++++++++
...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
import org.apache.log4j.spi.LoggerFactory;
import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
import org.apache.streams.local.tasks.StatusCounterMonitorThread;
import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
public void start() {
attachShutdownHandler();
boolean isRunning = true;
- this.executor = Executors.newFixedThreadPool(this.totalTasks);
+ this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
tasks = new HashMap<String, List<StreamsTask>>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+ private LocalStreamBuilder streamBuilder;
+ private volatile boolean isStoped;
+
+ /**
+ * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+ * @param numThreads number of threads in pool
+ * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+ */
+ public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+ super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ this.streamBuilder = streamBuilder;
+ this.isStoped = false;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if(t != null) {
+ LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+ LOGGER.error("Attempting to shut down stream.");
+ synchronized (this) {
+ if (!this.isStoped) {
+ this.isStoped = true;
+ this.streamBuilder.stop();
+ }
+ }
+ } else {
+ LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+ @Test
+ public void testShutDownOnException() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ throw new RuntimeException("Testing Throwable Handling!");
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+ @Test
+ public void testNormalExecution() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+}
[08/17] git commit: Merge pull request #4 from apache/master
Posted by sb...@apache.org.
Merge pull request #4 from apache/master
Merge Apache Master
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/507e6796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/507e6796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/507e6796
Branch: refs/heads/STREAMS-170
Commit: 507e67962e97e5e6b4c1de0bc0789c25804cd6af
Parents: dca6475 dc5e7de
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Fri Sep 19 11:03:37 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Fri Sep 19 11:03:37 2014 -0500
----------------------------------------------------------------------
README.txt | 2 +-
pom.xml | 2 +-
streams-contrib/pom.xml | 1 +
.../streams/console/ConsolePersistWriter.java | 5 +-
.../ElasticsearchConfigurator.java | 41 +-
.../ElasticsearchPersistDeleter.java | 34 +-
.../ElasticsearchPersistUpdater.java | 66 +-
.../ElasticsearchPersistWriter.java | 65 +-
.../ElasticsearchWriterConfiguration.json | 6 +-
.../streams-processor-jackson/pom.xml | 86 ++
.../CleanAdditionalPropertiesProcessor.java | 62 +
.../regex/AbstractRegexExtensionExtractor.java | 13 +-
.../org/apache/streams/urls/LinkResolver.java | 6 +
.../streams-provider-datasift/pom.xml | 21 +-
.../streams/datasift/csdl/DatasiftCsdlUtil.java | 6 +-
.../DatasiftTypeConverterProcessor.java | 154 +++
.../provider/DatasiftManagedSourceSetup.java | 76 ++
.../provider/DatasiftStreamConfigurator.java | 15 +-
.../DatasiftTypeConverterProcessor.java | 167 ---
.../serializer/DatasiftActivitySerializer.java | 5 +-
.../DatasiftDefaultActivitySerializer.java | 13 +-
.../DatasiftInstagramActivitySerializer.java | 118 ++
.../DatasiftTweetActivitySerializer.java | 6 +
.../main/jsonschema/com/datasift/Datasift.json | 1103 ------------------
.../com/datasift/DatasiftConfiguration.json | 22 -
.../com/datasift/DatasiftPushConfiguration.json | 17 -
.../datasift/DatasiftStreamConfiguration.json | 17 -
.../com/datasift/DatasiftTwitterUser.json | 68 --
.../org/apache/streams/datasift/Datasift.json | 458 ++++++++
.../streams/datasift/DatasiftConfiguration.json | 133 +++
.../datasift/DatasiftPushConfiguration.json | 17 +
.../datasift/DatasiftStreamConfiguration.json | 17 +
.../datasift/facebook/DatasiftFacebook.json | 120 ++
.../datasift/instagram/DatasiftInstagram.json | 178 +++
.../interaction/DatasiftInteraction.json | 92 ++
.../datasift/twitter/DatasiftTwitter.json | 365 ++++++
.../datasift/twitter/DatasiftTwitterMedia.json | 126 ++
.../datasift/twitter/DatasiftTwitterUser.json | 68 ++
.../DatasiftTypeConverterProcessorTest.java | 5 +-
.../DatasiftActivitySerializerTest.java | 19 +-
.../test/resources/instagram_datasift_json.txt | 25 +
.../metadata/facebook_post.mup | 694 +++++++++++
.../metadata/facebook_post.png | Bin 0 -> 1038039 bytes
.../api/FacebookPostActivitySerializer.java | 29 +-
.../processor/FacebookTypeConverter.java | 12 +-
.../pagefeed/FacebookDataCollector.java | 138 +++
.../pagefeed/FacebookPageFeedDataCollector.java | 124 ++
.../pagefeed/FacebookPageFeedProvider.java | 28 +
.../provider/pagefeed/FacebookProvider.java | 139 +++
.../FacebookStreamsPostSerializer.java | 60 +
.../streams/facebook/FacebookConfiguration.json | 33 +
.../provider/pagefeed/TestFacebookProvider.java | 94 ++
.../processor/InstagramTypeConverter.java | 19 +-
.../provider/InstagramAbstractProvider.java | 196 ++++
.../provider/InstagramDataCollector.java | 145 +++
.../provider/InstagramRecentMediaCollector.java | 171 ---
.../provider/InstagramRecentMediaProvider.java | 201 ----
.../InstagramRecentMediaCollector.java | 111 ++
.../InstagramRecentMediaProvider.java | 60 +
.../userinfo/InstagramUserInfoCollector.java | 92 ++
.../userinfo/InstagramUserInfoProvider.java | 39 +
.../serializer/InstagramUserInfoSerializer.java | 83 ++
.../InstagramRecentMediaCollectorTest.java | 159 ---
.../InstagramRecentMediaProviderTest.java | 175 ---
.../InstagramRecentMediaCollectorTest.java | 156 +++
.../InstagramRecentMediaProviderTest.java | 174 +++
.../InstagramUserInfoCollectorTest.java | 120 ++
.../apache/streams/data/util/MoreoverUtils.java | 2 +-
.../SysomosBeatActivityConverter.java | 7 +-
.../FetchAndReplaceTwitterProcessor.java | 4 +-
.../twitter/provider/TwitterConfigurator.java | 70 ++
.../provider/TwitterStreamConfigurator.java | 99 --
.../twitter/provider/TwitterStreamProvider.java | 2 +-
.../provider/TwitterTimelineProvider.java | 8 +-
.../TwitterUserInformationProvider.java | 4 +-
.../apache/streams/core/util/DatumUtils.java | 49 +
streams-pojo/metadata/Serializer.mup | 371 ++++++
streams-pojo/metadata/serializer.png | Bin 0 -> 604337 bytes
.../org/apache/streams/data/util/JsonUtil.java | 8 +-
.../jackson/StreamsDateTimeSerializer.java | 4 -
.../streams/jackson/StreamsJacksonMapper.java | 10 -
.../jackson/StreamsPeriodDeserializer.java | 1 -
.../jackson/StreamsPeriodSerializer.java | 2 -
.../org/apache/streams/pojo/json/activity.json | 3 +-
.../data/data/util/DateTimeSerDeTest.java | 2 -
streams-runtimes/streams-runtime-local/pom.xml | 6 +
.../local/tasks/StreamsPersistWriterTask.java | 2 +
.../local/tasks/StreamsProcessorTask.java | 6 +-
.../local/tasks/StreamsProviderTask.java | 19 +-
.../local/builders/LocalStreamBuilderTest.java | 37 +-
.../local/builders/ToyLocalBuilderExample.java | 6 +-
.../streams/local/tasks/BasicTasksTest.java | 6 +-
.../local/tasks/StreamsProviderTaskTest.java | 147 +++
.../test/processors/DoNothingProcessor.java | 2 +-
.../PassthroughDatumCounterProcessor.java | 2 +-
.../test/providers/NumericMessageProvider.java | 2 +-
.../local/test/writer/DatumCounterWriter.java | 2 +-
.../local/test/writer/DoNothingWriter.java | 2 +-
.../local/test/writer/SystemOutWriter.java | 2 +-
.../apache/streams/util/SerializationUtil.java | 4 +-
100 files changed, 5544 insertions(+), 2419 deletions(-)
----------------------------------------------------------------------
[12/17] git commit: STREAMS-180 | Missing javadoc
Posted by sb...@apache.org.
STREAMS-180 | Missing javadoc
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ef2e065f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ef2e065f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ef2e065f
Branch: refs/heads/STREAMS-170
Commit: ef2e065f79e3b050f94ead4c37b3d04f0e6f0059
Parents: 119608d
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 16:50:01 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 16:50:01 2014 -0500
----------------------------------------------------------------------
.../instagram/serializer/util/InstagramActivityUtil.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ef2e065f/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index c210be8..5bbbb35 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -85,6 +85,11 @@ public class InstagramActivityUtil {
activity.setProvider(getProvider());
}
+ /**
+ * Builds an Actor object given a UserInfoData object
+ * @param item
+ * @return Actor object
+ */
public static Actor buildActor(UserInfoData item) {
Actor actor = new Actor();
[05/17] git commit: refactored to move duplicated code to new
protected methods in ElasticsearchPersistWriter
Posted by sb...@apache.org.
refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bf87552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bf87552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bf87552
Branch: refs/heads/STREAMS-170
Commit: 5bf8755236ff3fe53eae7b0f131bfbf57647ba29
Parents: 9c98a45
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 13:59:20 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 13:59:20 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 20 +------
.../ElasticsearchPersistUpdater.java | 20 +------
.../ElasticsearchPersistWriter.java | 62 ++++++++++++++------
3 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 95004f5..319cece 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
LOGGER.debug("Delete Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
try {
delete(index, type, id);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index d0e8acd..872c65e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
LOGGER.debug("Update Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
String json;
try {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ca643b9..4ec3315 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -145,23 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
LOGGER.debug("Write Metadata: {}", metadata);
- String index = null;
- String type = null;
- String id = streamsDatum.getId();
-
- if( metadata != null && metadata.containsKey("index"))
- index = (String) streamsDatum.getMetadata().get("index");
- if( metadata != null && metadata.containsKey("type"))
- type = (String) streamsDatum.getMetadata().get("type");
- if( id == null && metadata != null && metadata.containsKey("id"))
- id = (String) streamsDatum.getMetadata().get("id");
-
- if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- index = config.getIndex();
- }
- if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
- type = config.getType();
- }
+ String index = getIndex(metadata, config);
+ String type = getType(metadata, config);
+ String id = getId(streamsDatum);
try {
add(index, type, id,
@@ -506,4 +492,46 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
}
+
+ protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+ String index = null;
+
+ if( metadata != null && metadata.containsKey("index"))
+ index = (String) metadata.get("index");
+
+ if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ index = config.getIndex();
+ }
+
+ return index;
+ }
+
+ protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+ String type = null;
+
+ if( metadata != null && metadata.containsKey("type"))
+ type = (String) metadata.get("type");
+
+ if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+ type = config.getType();
+ }
+
+
+ return type;
+ }
+
+ protected String getId(StreamsDatum datum) {
+
+ String id = datum.getId();
+
+ Map<String, Object> metadata = datum.getMetadata();
+
+ if( id == null && metadata != null && metadata.containsKey("id"))
+ id = (String) datum.getMetadata().get("id");
+
+ return id;
+ }
+
}