You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:14 UTC

[27/52] [abbrv] git commit: merging @smashew's DateUtil ensuring processors utilize prepare method

merging @smashew's DateUtil
ensuring processors utilize prepare method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6b66b2ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6b66b2ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6b66b2ef

Branch: refs/heads/sblackmon
Commit: 6b66b2ef78df1f217dd0d5a4ba28ca0c4df99e04
Parents: b8fef9d
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 18:34:11 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 18:34:11 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistReader.java             |  4 +++-
 .../ElasticsearchPersistReaderTask.java         |  3 ++-
 .../ElasticsearchPersistUpdater.java            |  3 ++-
 .../ElasticsearchPersistWriter.java             |  4 +++-
 .../jackson/StreamsPeriodDeserializer.java      | 21 ++++++++++++++++++
 .../jackson/StreamsPeriodSerializer.java        | 23 ++++++++++++++++++++
 6 files changed, 54 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index cc9b3fc..4fde58d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -14,6 +14,7 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
@@ -62,7 +63,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     private int batchSize = 100;
     private String scrollTimeout = null;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper;
 
     private ElasticsearchConfiguration config;
 
@@ -119,6 +120,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     @Override
     public void prepare(Object o) {
 
+        mapper = StreamsJacksonMapper.getInstance();
         persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
 
         // If we haven't already set up the search, then set up the search.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
index d3faa02..505dc01 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.elasticsearch;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -16,7 +17,7 @@ public class ElasticsearchPersistReaderTask implements Runnable {
 
     private ElasticsearchPersistReader reader;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
         this.reader = reader;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b55ad2c..5f59950 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -9,6 +9,7 @@ import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -109,7 +110,7 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     private ElasticsearchConfiguration config;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ab35edd..0ff0ebb 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -98,7 +99,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper;
 
     private ElasticsearchWriterConfiguration config;
 
@@ -528,6 +529,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     @Override
     public void prepare(Object configurationObject) {
+        mapper = StreamsJacksonMapper.getInstance();
         start();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
new file mode 100644
index 0000000..ff765f6
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
@@ -0,0 +1,21 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.Period;
+
+import java.io.IOException;
+
+public class StreamsPeriodDeserializer extends StdDeserializer<Period>
+{
+
+    protected StreamsPeriodDeserializer(Class<Period> dateTimeClass) {
+        super(dateTimeClass);
+    }
+
+    public Period deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+        return Period.millis(jpar.getIntValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6b66b2ef/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
new file mode 100644
index 0000000..614cbdd
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
@@ -0,0 +1,23 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import java.io.IOException;
+
+public class StreamsPeriodSerializer extends StdSerializer<Period>
+{
+    protected StreamsPeriodSerializer(Class<Period> dateTimeClass) {
+        super(dateTimeClass);
+    }
+
+    @Override
+    public void serialize(Period value, JsonGenerator jgen, SerializerProvider provider) throws IOException
+    {
+        jgen.writeString(Integer.toString(value.getMillis()));
+    }
+}
\ No newline at end of file