You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:14 UTC
[27/52] [abbrv] git commit: merging @smashew's DateUtil ensuring
processors utilize prepare method
merging @smashew's DateUtil
ensuring processors utilize prepare method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6b66b2ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6b66b2ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6b66b2ef
Branch: refs/heads/sblackmon
Commit: 6b66b2ef78df1f217dd0d5a4ba28ca0c4df99e04
Parents: b8fef9d
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 18:34:11 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 18:34:11 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistReader.java | 4 +++-
.../ElasticsearchPersistReaderTask.java | 3 ++-
.../ElasticsearchPersistUpdater.java | 3 ++-
.../ElasticsearchPersistWriter.java | 4 +++-
.../jackson/StreamsPeriodDeserializer.java | 21 ++++++++++++++++++
.../jackson/StreamsPeriodSerializer.java | 23 ++++++++++++++++++++
6 files changed, 54 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index cc9b3fc..4fde58d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -14,6 +14,7 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
@@ -62,7 +63,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
private int batchSize = 100;
private String scrollTimeout = null;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper;
private ElasticsearchConfiguration config;
@@ -119,6 +120,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
@Override
public void prepare(Object o) {
+ mapper = StreamsJacksonMapper.getInstance();
persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
// If we haven't already set up the search, then set up the search.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
index d3faa02..505dc01 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,7 +17,7 @@ public class ElasticsearchPersistReaderTask implements Runnable {
private ElasticsearchPersistReader reader;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
this.reader = reader;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b55ad2c..5f59950 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -9,6 +9,7 @@ import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -109,7 +110,7 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
private ElasticsearchConfiguration config;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ab35edd..0ff0ebb 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -98,7 +99,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper;
private ElasticsearchWriterConfiguration config;
@@ -528,6 +529,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
start();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
new file mode 100644
index 0000000..ff765f6
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
@@ -0,0 +1,21 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.Period;
+
+import java.io.IOException;
+
+public class StreamsPeriodDeserializer extends StdDeserializer<Period>
+{
+
+ protected StreamsPeriodDeserializer(Class<Period> dateTimeClass) {
+ super(dateTimeClass);
+ }
+
+ public Period deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+ return Period.millis(jpar.getIntValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
new file mode 100644
index 0000000..614cbdd
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
@@ -0,0 +1,23 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import java.io.IOException;
+
+public class StreamsPeriodSerializer extends StdSerializer<Period>
+{
+ protected StreamsPeriodSerializer(Class<Period> dateTimeClass) {
+ super(dateTimeClass);
+ }
+
+ @Override
+ public void serialize(Period value, JsonGenerator jgen, SerializerProvider provider) throws IOException
+ {
+ jgen.writeString(Integer.toString(value.getMillis()));
+ }
+}
\ No newline at end of file