You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/18 18:04:47 UTC
incubator-streams git commit: These classes were deleted prematurely,
causing unnecessary breaking changes. They should deprecated as part
of STREAMS-218 and deleted in the next minor version.
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-212.3 [created] 91dd9a3c5
These classes were deleted prematurely, causing unnecessary breaking changes. They should deprecated as part of STREAMS-218 and deleted in the next minor version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/91dd9a3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/91dd9a3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/91dd9a3c
Branch: refs/heads/STREAMS-212.3
Commit: 91dd9a3c5a489239edffb341f4aea89c919a4b33
Parents: d0b5a0a
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 18 10:59:53 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Tue Nov 18 10:59:53 2014 -0600
----------------------------------------------------------------------
.../processor/TwitterEventProcessor.java | 194 +++++++++++++++++
.../twitter/processor/TwitterTypeConverter.java | 209 +++++++++++++++++++
2 files changed, 403 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
new file mode 100644
index 0000000..fb4615f
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.*;
+import org.apache.streams.util.ComponentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterEventProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "TwitterEventProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
+
+ private ObjectMapper mapper = new StreamsTwitterMapper();
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
+
+ public TwitterEventProcessor(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public TwitterEventProcessor( Class outClass) {
+ this(null, outClass);
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+ Object result = null;
+
+ Preconditions.checkNotNull(event);
+ Preconditions.checkNotNull(mapper);
+ Preconditions.checkNotNull(twitterJsonActivitySerializer);
+
+ if( outClass.equals( Activity.class )) {
+ LOGGER.debug("ACTIVITY");
+ result = twitterJsonActivitySerializer.deserialize(
+ mapper.writeValueAsString(event));
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null )
+ return result;
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ // first check for valid json
+ ObjectNode node = (ObjectNode) entry.getDocument();
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
+
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+
+ if( StringUtils.isNotEmpty(json)) {
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass(json);
+
+ // if the target is string, just pass-through
+ if (java.lang.String.class.equals(outClass))
+ return Lists.newArrayList(new StreamsDatum(json));
+ else {
+ // convert to desired format
+ Object out = null;
+ try {
+ out = convert(node, inClass, outClass);
+ } catch (ActivitySerializerException e) {
+ LOGGER.warn("Failed deserializing", e);
+ return Lists.newArrayList();
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Failed parsing JSON", e);
+ return Lists.newArrayList();
+ }
+
+ if (out != null && validate(out, outClass))
+ return Lists.newArrayList(new StreamsDatum(out));
+ }
+ }
+
+ return Lists.newArrayList();
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = new StreamsJacksonMapper();
+ twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/91dd9a3c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
new file mode 100644
index 0000000..74cce27
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor {
+
+ public final static String STREAMS_ID = "TwitterTypeConverter";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+ private ObjectMapper mapper;
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
+
+ private int count = 0;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public TwitterTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ LOGGER.debug("ACTIVITY");
+ result = twitterJsonActivitySerializer.deserialize(
+ mapper.writeValueAsString(event));
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null ) {
+ count ++;
+ return result;
+ }
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ StreamsDatum result = null;
+
+ try {
+
+ Object item = entry.getDocument();
+ ObjectNode node;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+ if( item instanceof String ) {
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass)) {
+ result = entry;
+ }
+ else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass((String) item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+ }
+
+ } else if( item instanceof ObjectNode ) {
+
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if( result != null )
+ return Lists.newArrayList(result);
+ else
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void prepare(Object o) {
+ mapper = new StreamsTwitterMapper();
+ twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+}