You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/18 18:04:47 UTC

incubator-streams git commit: These classes were deleted prematurely, causing unnecessary breaking changes. They should deprecated as part of STREAMS-218 and deleted in the next minor version.

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-212.3 [created] 91dd9a3c5


These classes were deleted prematurely, causing unnecessary breaking changes.  They should deprecated as part of STREAMS-218 and deleted in the next minor version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/91dd9a3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/91dd9a3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/91dd9a3c

Branch: refs/heads/STREAMS-212.3
Commit: 91dd9a3c5a489239edffb341f4aea89c919a4b33
Parents: d0b5a0a
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 18 10:59:53 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Tue Nov 18 10:59:53 2014 -0600

----------------------------------------------------------------------
 .../processor/TwitterEventProcessor.java        | 194 +++++++++++++++++
 .../twitter/processor/TwitterTypeConverter.java | 209 +++++++++++++++++++
 2 files changed, 403 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
new file mode 100644
index 0000000..fb4615f
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.*;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterEventProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "TwitterEventProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
+
+    private ObjectMapper mapper = new StreamsTwitterMapper();
+
+    private Class inClass;
+    private Class outClass;
+
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
+
+    public TwitterEventProcessor(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public TwitterEventProcessor( Class outClass) {
+        this(null, outClass);
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+        Object result = null;
+
+        Preconditions.checkNotNull(event);
+        Preconditions.checkNotNull(mapper);
+        Preconditions.checkNotNull(twitterJsonActivitySerializer);
+
+        if( outClass.equals( Activity.class )) {
+                LOGGER.debug("ACTIVITY");
+                result = twitterJsonActivitySerializer.deserialize(
+                        mapper.writeValueAsString(event));
+        } else if( outClass.equals( Tweet.class )) {
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                result = mapper.convertValue(event, Tweet.class);
+            }
+        } else if( outClass.equals( Retweet.class )) {
+            if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                result = mapper.convertValue(event, Retweet.class);
+            }
+        } else if( outClass.equals( Delete.class )) {
+            if ( inClass.equals( Delete.class )) {
+                LOGGER.debug("DELETE");
+                result = mapper.convertValue(event, Delete.class);
+            }
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+            // no supported conversion were applied
+        if( result != null )
+            return result;
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        // first check for valid json
+        ObjectNode node = (ObjectNode) entry.getDocument();
+
+        LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
+
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(node);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+
+        if( StringUtils.isNotEmpty(json)) {
+
+            // since data is coming from outside provider, we don't know what type the events are
+            Class inClass = TwitterEventClassifier.detectClass(json);
+
+            // if the target is string, just pass-through
+            if (java.lang.String.class.equals(outClass))
+                return Lists.newArrayList(new StreamsDatum(json));
+            else {
+                // convert to desired format
+                Object out = null;
+                try {
+                    out = convert(node, inClass, outClass);
+                } catch (ActivitySerializerException e) {
+                    LOGGER.warn("Failed deserializing", e);
+                    return Lists.newArrayList();
+                } catch (JsonProcessingException e) {
+                    LOGGER.warn("Failed parsing JSON", e);
+                    return Lists.newArrayList();
+                }
+
+                if (out != null && validate(out, outClass))
+                    return Lists.newArrayList(new StreamsDatum(out));
+            }
+        }
+
+        return Lists.newArrayList();
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        mapper = new StreamsJacksonMapper();
+        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
new file mode 100644
index 0000000..74cce27
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor {
+
+    public final static String STREAMS_ID = "TwitterTypeConverter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+    private ObjectMapper mapper;
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
+
+    private int count = 0;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public TwitterTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            LOGGER.debug("ACTIVITY");
+            result = twitterJsonActivitySerializer.deserialize(
+                    mapper.writeValueAsString(event));
+        } else if( outClass.equals( Tweet.class )) {
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                result = mapper.convertValue(event, Tweet.class);
+            }
+        } else if( outClass.equals( Retweet.class )) {
+            if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                result = mapper.convertValue(event, Retweet.class);
+            }
+        } else if( outClass.equals( Delete.class )) {
+            if ( inClass.equals( Delete.class )) {
+                LOGGER.debug("DELETE");
+                result = mapper.convertValue(event, Delete.class);
+            }
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+        // no supported conversion were applied
+        if( result != null ) {
+            count ++;
+            return result;
+        }
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    result = entry;
+                }
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    // since data is coming from outside provider, we don't know what type the events are
+                    Class inClass = TwitterEventClassifier.detectClass((String) item);
+
+                    Object out = convert(node, inClass, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode ) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                // since data is coming from outside provider, we don't know what type the events are
+                Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
+
+                Object out = convert(node, inClass, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+        mapper = new StreamsTwitterMapper();
+        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}