You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2017/02/08 04:32:48 UTC

[2/9] incubator-streams git commit: STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java
new file mode 100644
index 0000000..d4719ce
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.youtube.YouTube;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Retrieve recent activity from a list of user ids or names.
+ */
+public class YoutubeUserActivityProvider extends YoutubeProvider {
+
+  public YoutubeUserActivityProvider() {
+    super();
+  }
+
+  public YoutubeUserActivityProvider(YoutubeConfiguration config) {
+    super(config);
+  }
+
+  /**
+   * To use from command line:
+   * <p>
+   * <p/>
+   * Supply (at least) the following required configuration in application.conf:
+   * <p>
+   * <p/>
+   * youtube.oauth.pathToP12KeyFile
+   * youtube.oauth.serviceAccountEmailAddress
+   * youtube.apiKey
+   * youtube.youtubeUsers
+   * <p>
+   * <p/>
+   * Launch using:
+   * <p>
+   * <p/>
+   * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube");
+    YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          if (datum.getDocument() instanceof String) {
+            json = (String) datum.getDocument();
+          } else {
+            json = mapper.writeValueAsString(datum.getDocument());
+          }
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
+    }
+    while (provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+    return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java
new file mode 100644
index 0000000..4b985f6
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.Channel;
+import com.google.api.services.youtube.model.Thumbnail;
+import com.google.api.services.youtube.model.ThumbnailDetails;
+import com.google.api.services.youtube.model.Video;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class YoutubeActivityUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class);
+
+  /**
+   * Given a {@link YouTube.Videos} object and an
+   * {@link Activity} object, fill out the appropriate details
+   *
+   * @param video Video
+   * @param activity Activity
+   * @throws ActivitySerializerException ActivitySerializerException
+   */
+  public static void updateActivity(Video video, Activity activity, String channelId) throws ActivitySerializerException {
+    activity.setActor(buildActor(video, video.getSnippet().getChannelId()));
+    activity.setVerb("post");
+
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null)));
+
+    activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue()));
+    activity.setTitle(video.getSnippet().getTitle());
+    activity.setContent(video.getSnippet().getDescription());
+    activity.setUrl("https://www.youtube.com/watch?v=" + video.getId());
+
+    activity.setProvider(getProvider());
+
+    activity.setObject(buildActivityObject(video));
+
+    addYoutubeExtensions(activity, video);
+  }
+
+
+  /**
+   * Given a {@link Channel} object and an
+   * {@link Activity} object, fill out the appropriate details
+   *
+   * @param channel Channel
+   * @param activity Activity
+   * @throws ActivitySerializerException ActivitySerializerException
+   */
+  public static void updateActivity(Channel channel, Activity activity, String channelId) throws ActivitySerializerException {
+    try {
+      activity.setProvider(getProvider());
+      activity.setVerb("post");
+      activity.setActor(createActorForChannel(channel));
+      Map<String, Object> extensions = new HashMap<>();
+      extensions.put("youtube", channel);
+      activity.setAdditionalProperty("extensions", extensions);
+    } catch (Throwable throwable) {
+      throw new ActivitySerializerException(throwable);
+    }
+  }
+
+  /**
+   * createActorForChannel.
+   * @param channel Channel
+   * @return $.actor
+   */
+  public static ActivityObject createActorForChannel(Channel channel) {
+    ActivityObject actor = new ActivityObject();
+    // TODO: use generic provider id concatenator
+    actor.setId("id:youtube:" + channel.getId());
+    actor.setSummary(channel.getSnippet().getDescription());
+    actor.setDisplayName(channel.getSnippet().getTitle());
+    Image image = new Image();
+    image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
+    actor.setImage(image);
+    actor.setUrl("https://youtube.com/user/" + channel.getId());
+    Map<String, Object> actorExtensions = new HashMap<>();
+    actorExtensions.put("followers", channel.getStatistics().getSubscriberCount());
+    actorExtensions.put("posts", channel.getStatistics().getVideoCount());
+    actor.setAdditionalProperty("extensions", actorExtensions);
+    return actor;
+  }
+
+  /**
+   * Given a video object, create the appropriate activity object with a valid image
+   * (thumbnail) and video URL.
+   * @param video Video
+   * @return Activity Object with Video URL and a thumbnail image
+   */
+  private static ActivityObject buildActivityObject(Video video) {
+    ActivityObject activityObject = new ActivityObject();
+
+    ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails();
+    Thumbnail thumbnail = thumbnailDetails.getDefault();
+
+    if (thumbnail != null) {
+      Image image = new Image();
+      image.setUrl(thumbnail.getUrl());
+      image.setHeight(thumbnail.getHeight());
+      image.setWidth(thumbnail.getWidth());
+
+      activityObject.setImage(image);
+    }
+
+    activityObject.setUrl("https://www.youtube.com/watch?v=" + video.getId());
+    activityObject.setObjectType("video");
+
+    return activityObject;
+  }
+
+  /**
+   * Add the Youtube extensions to the Activity object that we're building.
+   * @param activity Activity
+   * @param video Video
+   */
+  private static void addYoutubeExtensions(Activity activity, Video video) {
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+
+    extensions.put("youtube", video);
+
+    if (video.getStatistics() != null) {
+      Map<String, Object> likes = new HashMap<>();
+      likes.put("count", video.getStatistics().getCommentCount());
+      extensions.put("likes", likes);
+    }
+  }
+
+  /**
+   * Build an {@link ActivityObject} actor given the video object
+   * @param video Video
+   * @param id id
+   * @return Actor object
+   */
+  private static ActivityObject buildActor(Video video, String id) {
+    ActivityObject actor = new ActivityObject();
+
+    actor.setId("id:youtube:" + id);
+    actor.setDisplayName(video.getSnippet().getChannelTitle());
+    actor.setSummary(video.getSnippet().getDescription());
+    actor.setAdditionalProperty("handle", video.getSnippet().getChannelTitle());
+
+    return actor;
+  }
+
+  /**
+   * Gets the common youtube {@link Provider} object
+   * @return a provider object representing YouTube
+   */
+  public static Provider getProvider() {
+    Provider provider = new Provider();
+    provider.setId("id:providers:youtube");
+    provider.setDisplayName("YouTube");
+    return provider;
+  }
+
+  /**
+   * Formats the ID to conform with the Apache Streams activity ID convention
+   * @param idparts the parts of the ID to join
+   * @return a valid Activity ID in format "id:youtube:part1:part2:...partN"
+   */
+  public static String formatId(String... idparts) {
+    return String.join(":",
+        Stream.concat(Arrays.stream(new String[] {"id:youtube"}), Arrays.stream(idparts)).collect(Collectors.toList()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java
new file mode 100644
index 0000000..e5d0ed0
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.youtube.model.Channel;
+import com.google.api.services.youtube.model.ChannelContentDetails;
+import com.google.api.services.youtube.model.ChannelLocalization;
+import com.google.api.services.youtube.model.ChannelSnippet;
+import com.google.api.services.youtube.model.ChannelStatistics;
+import com.google.api.services.youtube.model.ChannelTopicDetails;
+import com.google.api.services.youtube.model.Thumbnail;
+import com.google.api.services.youtube.model.ThumbnailDetails;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * YoutubeChannelDeserializer is a JsonDeserializer for Channel.
+ */
+public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
+
+  @Override
+  public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
+    JsonNode node = jp.getCodec().readTree(jp);
+    try {
+      Channel channel = new Channel();
+      if (node.findPath("etag") != null) {
+        channel.setEtag(node.get("etag").asText());
+      }
+      if (node.findPath("kind") != null) {
+        channel.setKind(node.get("kind").asText());
+      }
+      channel.setId(node.get("id").asText());
+      channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails")));
+      channel.setStatistics(setChannelStatistics(node.findValue("statistics")));
+      channel.setContentDetails(setContentDetails(node.findValue("contentDetails")));
+      channel.setSnippet(setChannelSnippet(node.findValue("snippet")));
+      return channel;
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  protected ChannelSnippet setChannelSnippet(JsonNode node) {
+    ChannelSnippet snippet = new ChannelSnippet();
+    snippet.setTitle(node.get("title").asText());
+    snippet.setDescription(node.get("description").asText());
+    snippet.setPublishedAt(new DateTime(node.get("publishedAt").get("value").longValue()));
+    snippet.setLocalized(setLocalized(node.findValue("localized")));
+    snippet.setThumbnails(setThumbnails(node.findValue("thumbnails")));
+    return snippet;
+  }
+
+  protected ThumbnailDetails setThumbnails(JsonNode node) {
+    ThumbnailDetails details = new ThumbnailDetails();
+    if (node == null) {
+      return details;
+    }
+    details.setDefault(new Thumbnail().setUrl(node.get("default").get("url").asText()));
+    details.setHigh(new Thumbnail().setUrl(node.get("high").get("url").asText()));
+    details.setMedium(new Thumbnail().setUrl(node.get("medium").get("url").asText()));
+    return details;
+  }
+
+  protected ChannelLocalization setLocalized(JsonNode node) {
+    if (node == null) {
+      return new ChannelLocalization();
+    }
+    ChannelLocalization localization = new ChannelLocalization();
+    localization.setDescription(node.get("description").asText());
+    localization.setTitle(node.get("title").asText());
+    return localization;
+  }
+
+  protected ChannelContentDetails setContentDetails(JsonNode node) {
+    ChannelContentDetails contentDetails = new ChannelContentDetails();
+    if (node == null) {
+      return contentDetails;
+    }
+    if (node.findValue("googlePlusUserId") != null) {
+      contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText());
+    }
+    contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists")));
+    return contentDetails;
+  }
+
+  protected ChannelContentDetails.RelatedPlaylists setRelatedPlaylists(JsonNode node) {
+    ChannelContentDetails.RelatedPlaylists playlists = new ChannelContentDetails.RelatedPlaylists();
+    if (node == null) {
+      return playlists;
+    }
+    if (node.findValue("favorites") != null) {
+      playlists.setFavorites(node.get("favorites").asText());
+    }
+    if (node.findValue("likes") != null) {
+      playlists.setLikes(node.get("likes").asText());
+    }
+    if (node.findValue("uploads") != null) {
+      playlists.setUploads(node.get("uploads").asText());
+    }
+    return playlists;
+  }
+
+  protected ChannelStatistics setChannelStatistics(JsonNode node) {
+    ChannelStatistics stats = new ChannelStatistics();
+    if (node == null) {
+      return stats;
+    }
+    stats.setCommentCount(node.get("commentCount").bigIntegerValue());
+    stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean());
+    stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue());
+    stats.setVideoCount(node.get("videoCount").bigIntegerValue());
+    stats.setViewCount(node.get("viewCount").bigIntegerValue());
+    return stats;
+  }
+
+  protected ChannelTopicDetails setTopicDetails(JsonNode node) {
+    ChannelTopicDetails details = new ChannelTopicDetails();
+    if (node == null) {
+      return details;
+    }
+    List<String> topicIds = new LinkedList<>();
+    for (JsonNode jsonNode : node.get("topicIds")) {
+      topicIds.add(jsonNode.asText());
+    }
+    details.setTopicIds(topicIds);
+    return details;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java
new file mode 100644
index 0000000..964d6ca
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.youtube.model.Video;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class YoutubeEventClassifier {
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static final String VIDEO_IDENTIFIER = "\"youtube#video\"";
+  private static final String CHANNEL_IDENTIFIER = "youtube#channel";
+
+  /**
+   * detect probable Class of a json String from YouTube.
+   * @param json json
+   * @return Class
+   */
+  public static Class detectClass(String json) {
+    Objects.requireNonNull(json);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+    ObjectNode objectNode;
+    try {
+      objectNode = (ObjectNode) mapper.readTree(json);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return null;
+    }
+
+    if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) {
+      return Video.class;
+    } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)) {
+      return com.google.api.services.youtube.model.Channel.class;
+    } else {
+      return ObjectNode.class;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java
new file mode 100644
index 0000000..24f8e51
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.youtube.model.Thumbnail;
+import com.google.api.services.youtube.model.ThumbnailDetails;
+import com.google.api.services.youtube.model.Video;
+import com.google.api.services.youtube.model.VideoSnippet;
+import com.google.api.services.youtube.model.VideoStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class YoutubeVideoDeserializer extends JsonDeserializer<Video> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeVideoDeserializer.class);
+
+  /**
+   * Because the Youtube Video object contains complex objects within its hierarchy, we have to use
+   * a custom deserializer
+   *
+   * @param jsonParser jsonParser
+   * @param deserializationContext deserializationContext
+   * @return The deserialized {@link com.google.api.services.youtube.YouTube.Videos} object
+   * @throws java.io.IOException IOException
+   * @throws com.fasterxml.jackson.core.JsonProcessingException JsonProcessingException
+   */
+  @Override
+  public Video deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+    Video video = new Video();
+
+    try {
+      video.setId(node.get("id").asText());
+      video.setEtag(node.get("etag").asText());
+      video.setKind(node.get("kind").asText());
+
+      video.setSnippet(buildSnippet(node));
+      video.setStatistics(buildStatistics(node));
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize a Video object: {}", ex);
+    }
+
+    return video;
+  }
+
+  /**
+   * Given the raw JsonNode, construct a video snippet object.
+   * @param node JsonNode
+   * @return VideoSnippet
+   */
+  private VideoSnippet buildSnippet(JsonNode node) {
+    VideoSnippet snippet = new VideoSnippet();
+    JsonNode snippetNode = node.get("snippet");
+
+    snippet.setChannelId(snippetNode.get("channelId").asText());
+    snippet.setChannelTitle(snippetNode.get("channelTitle").asText());
+    snippet.setDescription(snippetNode.get("description").asText());
+    snippet.setTitle(snippetNode.get("title").asText());
+    snippet.setPublishedAt(new DateTime(snippetNode.get("publishedAt").get("value").asLong()));
+
+    ThumbnailDetails thumbnailDetails = new ThumbnailDetails();
+    for (JsonNode t : snippetNode.get("thumbnails")) {
+      Thumbnail thumbnail = new Thumbnail();
+
+      thumbnail.setHeight(t.get("height").asLong());
+      thumbnail.setUrl(t.get("url").asText());
+      thumbnail.setWidth(t.get("width").asLong());
+
+      thumbnailDetails.setDefault(thumbnail);
+    }
+
+    snippet.setThumbnails(thumbnailDetails);
+
+    return snippet;
+  }
+
+  /**
+   * Given the raw JsonNode, construct a statistics object.
+   * @param node JsonNode
+   * @return VideoStatistics
+   */
+  private VideoStatistics buildStatistics(JsonNode node) {
+    VideoStatistics statistics = new VideoStatistics();
+    JsonNode statisticsNode = node.get("statistics");
+
+    statistics.setCommentCount(statisticsNode.get("commentCount").bigIntegerValue());
+    statistics.setDislikeCount(statisticsNode.get("dislikeCount").bigIntegerValue());
+    statistics.setFavoriteCount(statisticsNode.get("favoriteCount").bigIntegerValue());
+    statistics.setLikeCount(statisticsNode.get("likeCount").bigIntegerValue());
+    statistics.setViewCount(statisticsNode.get("viewCount").bigIntegerValue());
+
+    return statistics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/jsonschema/com/youtube/YoutubeConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/jsonschema/com/youtube/YoutubeConfiguration.json b/streams-contrib/streams-provider-youtube/src/main/jsonschema/com/youtube/YoutubeConfiguration.json
deleted file mode 100644
index 9b57394..0000000
--- a/streams-contrib/streams-provider-youtube/src/main/jsonschema/com/youtube/YoutubeConfiguration.json
+++ /dev/null
@@ -1,105 +0,0 @@
-{
-  "$schema": "http://json-schema.org/draft-03/schema",
-  "$license": [
-    "http://www.apache.org/licenses/LICENSE-2.0"
-  ],
-  "id": "#",
-  "type": "object",
-  "javaType" : "org.apache.youtube.pojo.YoutubeConfiguration",
-  "javaInterfaces": ["java.io.Serializable"],
-  "properties": {
-    "protocol": {
-      "type": "string",
-      "description": "The protocol"
-    },
-    "host": {
-      "type": "string",
-      "description": "The host"
-    },
-    "port": {
-      "type": "integer",
-      "description": "The port"
-    },
-    "version": {
-      "type": "string",
-      "description": "The version"
-    },
-    "endpoint": {
-      "type": "string",
-      "description": "The endpoint"
-    },
-    "apiKey": {
-      "type": "string",
-      "description": "API key to allow for authenticated (but not owned) requests"
-    },
-    "follow": {
-      "type": "array",
-      "description": "DEPRECATED. A list of user names, indicating the users whose activities should be delivered on the stream",
-      "items": {
-        "type": "string"
-      }
-    },
-    "youtubeUsers": {
-      "type": "array",
-      "description": "A list of user user ids and optional date parameters for the GPlus provider",
-      "items": {
-        "type": "object",
-        "$ref": "#/definitions/userInfo"
-      }
-    },
-    "defaultAfterDate": {
-      "type": "string",
-      "format": "date-time",
-      "description": "Optional parameter for the provider. If this value is not null an the afterDate value in the userInfo is null, this value will be used."
-    },
-    "defaultBeforeDate": {
-      "type": "string",
-      "format": "date-time",
-      "description": "Optional parameter for the provider. If this value is not null and the beforeDate value in the userInfo is null, this value will be used."
-    },
-    "oauth": {
-      "type": "object",
-      "dynamic": "true",
-      "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
-      "javaInterfaces": ["java.io.Serializable"],
-      "description": "DEPRICATED",
-      "properties": {
-        "appName": {
-          "type": "string"
-        },
-        "pathToP12KeyFile": {
-          "type": "string",
-          "description": "Absolute Path to key file"
-        },
-        "serviceAccountEmailAddress": {
-          "type": "string",
-          "description": "Service Account email address for your app"
-        }
-      }
-    }
-  },
-  "definitions": {
-    "userInfo": {
-      "type": "object",
-      "javaInterfaces" : ["java.io.Serializable"],
-      "dynamic": "true",
-      "javaType": "org.apache.streams.google.gplus.configuration.UserInfo",
-      "properties": {
-        "userId": {
-          "type": "string",
-          "description": "Google+ user id"
-        },
-        "afterDate": {
-          "type": "string",
-          "format": "date-time",
-          "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
-        },
-        "beforeDate": {
-          "type": "string",
-          "format": "date-time",
-          "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/jsonschema/org/apache/streams/youtube/YoutubeConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/jsonschema/org/apache/streams/youtube/YoutubeConfiguration.json b/streams-contrib/streams-provider-youtube/src/main/jsonschema/org/apache/streams/youtube/YoutubeConfiguration.json
new file mode 100644
index 0000000..7e80731
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/main/jsonschema/org/apache/streams/youtube/YoutubeConfiguration.json
@@ -0,0 +1,105 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.youtube.YoutubeConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "protocol": {
+      "type": "string",
+      "description": "The protocol"
+    },
+    "host": {
+      "type": "string",
+      "description": "The host"
+    },
+    "port": {
+      "type": "integer",
+      "description": "The port"
+    },
+    "version": {
+      "type": "string",
+      "description": "The version"
+    },
+    "endpoint": {
+      "type": "string",
+      "description": "The endpoint"
+    },
+    "apiKey": {
+      "type": "string",
+      "description": "API key to allow for authenticated (but not owned) requests"
+    },
+    "follow": {
+      "type": "array",
+      "description": "DEPRECATED. A list of user names, indicating the users whose activities should be delivered on the stream",
+      "items": {
+        "type": "string"
+      }
+    },
+    "youtubeUsers": {
+      "type": "array",
+      "description": "A list of user user ids and optional date parameters for the GPlus provider",
+      "items": {
+        "type": "object",
+        "$ref": "#/definitions/userInfo"
+      }
+    },
+    "defaultAfterDate": {
+      "type": "string",
+      "format": "date-time",
+      "description": "Optional parameter for the provider. If this value is not null an the afterDate value in the userInfo is null, this value will be used."
+    },
+    "defaultBeforeDate": {
+      "type": "string",
+      "format": "date-time",
+      "description": "Optional parameter for the provider. If this value is not null and the beforeDate value in the userInfo is null, this value will be used."
+    },
+    "oauth": {
+      "type": "object",
+      "dynamic": "true",
+      "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
+      "javaInterfaces": ["java.io.Serializable"],
+      "description": "DEPRICATED",
+      "properties": {
+        "appName": {
+          "type": "string"
+        },
+        "pathToP12KeyFile": {
+          "type": "string",
+          "description": "Absolute Path to key file"
+        },
+        "serviceAccountEmailAddress": {
+          "type": "string",
+          "description": "Service Account email address for your app"
+        }
+      }
+    }
+  },
+  "definitions": {
+    "userInfo": {
+      "type": "object",
+      "javaInterfaces" : ["java.io.Serializable"],
+      "dynamic": "true",
+      "javaType": "org.apache.streams.google.gplus.configuration.UserInfo",
+      "properties": {
+        "userId": {
+          "type": "string",
+          "description": "Google+ user id"
+        },
+        "afterDate": {
+          "type": "string",
+          "format": "date-time",
+          "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
+        },
+        "beforeDate": {
+          "type": "string",
+          "format": "date-time",
+          "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
deleted file mode 100644
index 469b8d0..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.processor;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.youtube.model.Video;
-import com.youtube.serializer.YoutubeActivityUtil;
-import com.youtube.serializer.YoutubeVideoDeserializer;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Test for YoutubeTypeConverter.
- */
-public class YoutubeTypeConverterTest {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeTypeConverterTest.class);
-  private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-
-  private YoutubeTypeConverter youtubeTypeConverter;
-  private ObjectMapper objectMapper;
-
-  /**
-   * setup for test.
-   */
-  @Before
-  public void setup() {
-    objectMapper = StreamsJacksonMapper.getInstance();
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
-    objectMapper.registerModule(simpleModule);
-    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-    youtubeTypeConverter = new YoutubeTypeConverter();
-    youtubeTypeConverter.prepare(null);
-  }
-
-  @Test
-  public void testVideoConversion() {
-    try {
-      LOGGER.info("raw: {}", testVideo);
-      Activity activity = new Activity();
-
-      Video video = objectMapper.readValue(testVideo, Video.class);
-      StreamsDatum streamsDatum = new StreamsDatum(video);
-
-      assertNotNull(streamsDatum.getDocument());
-
-      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
-      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
-
-      assertEquals(retList.size(), 1);
-      assert (retList.get(0).getDocument() instanceof Activity);
-      assertEquals(activity, retList.get(0).getDocument());
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to convert video to activity: {}", ex);
-    }
-  }
-
-  @Test
-  public void testStringVideoConversion() {
-    try {
-      LOGGER.info("raw: {}", testVideo);
-      Activity activity = new Activity();
-
-      Video video = objectMapper.readValue(testVideo, Video.class);
-      StreamsDatum streamsDatum = new StreamsDatum(testVideo);
-
-      assertNotNull(streamsDatum.getDocument());
-
-      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
-      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
-
-      assertEquals(retList.size(), 1);
-      assert (retList.get(0).getDocument() instanceof Activity);
-      assertEquals(activity, retList.get(0).getDocument());
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to convert video to activity: {}", ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
deleted file mode 100644
index 0ae4822..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
-
-import com.google.api.services.youtube.YouTube;
-import com.google.api.services.youtube.model.Channel;
-import com.google.api.services.youtube.model.ChannelListResponse;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * YoutubeChannelDataCollectorTest tests YoutubeChannelDataCollector.
- */
-public class YoutubeChannelDataCollectorTest {
-
-  private static final String ID = "12345";
-
-  @Test
-  public void testDataCollector() throws Exception {
-    YouTube youTube = createMockYoutube();
-    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-    BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
-    UserInfo userInfo = new UserInfo();
-    userInfo.setUserId(ID);
-    YoutubeConfiguration config = new YoutubeConfiguration();
-    config.setApiKey(ID);
-    YoutubeChannelDataCollector collector = new YoutubeChannelDataCollector(youTube, queue, strategy, userInfo, config);
-    collector.run();
-    assertEquals(1, queue.size());
-    StreamsDatum datum = queue.take();
-    assertNotNull(datum);
-    String document = (String) datum.getDocument();
-    assertNotNull(document);
-  }
-
-  private YouTube createMockYoutube() throws Exception {
-    YouTube mockYouTube = mock(YouTube.class);
-    YouTube.Channels channels = createMockChannels();
-    when(mockYouTube.channels()).thenReturn(channels);
-    return mockYouTube;
-  }
-
-  private YouTube.Channels createMockChannels() throws Exception {
-    YouTube.Channels mockChannels = mock(YouTube.Channels.class);
-    YouTube.Channels.List channelLists = createMockChannelsList();
-    when(mockChannels.list(anyString())).thenReturn(channelLists);
-    return mockChannels;
-  }
-
-  private YouTube.Channels.List createMockChannelsList() throws Exception {
-    YouTube.Channels.List mockList = mock(YouTube.Channels.List.class);
-    when(mockList.setId(anyString())).thenReturn(mockList);
-    when(mockList.setKey(anyString())).thenReturn(mockList);
-    ChannelListResponse response = createMockResponse();
-    when(mockList.execute()).thenReturn(response);
-    return mockList;
-  }
-
-  private ChannelListResponse createMockResponse() {
-    ChannelListResponse response = new ChannelListResponse();
-    List<Channel> channelList = new LinkedList<>();
-    response.setItems(channelList);
-    Channel channel = new Channel();
-    channel.setId(ID);
-    channelList.add(channel);
-    return response;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
deleted file mode 100644
index 121c3d5..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.google.api.services.youtube.YouTube;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.joda.time.DateTime;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-/**
- * Test for YoutubeProvider.
- */
-public class YoutubeProviderTest {
-
-  /**
-   * Test that every collector will be run and that data queued from the collectors will be processed.
-   */
-  @Test
-  public void testDataCollectorRunsPerUser() {
-    Random random = new Random(System.currentTimeMillis());
-    int numUsers = random.nextInt(1000);
-    List<UserInfo> userList = new LinkedList<>();
-
-    for ( int i = 0; i < numUsers; ++i ) {
-      userList.add(new UserInfo());
-    }
-
-    YoutubeConfiguration config = new YoutubeConfiguration();
-    config.setYoutubeUsers(userList);
-    config.setApiKey("API_KEY");
-    YoutubeProvider provider = buildProvider(config);
-
-    try {
-      provider.prepare(null);
-      provider.startStream();
-      int datumCount = 0;
-      while (provider.isRunning()) {
-        datumCount += provider.readCurrent().size();
-      }
-      assertEquals(numUsers, datumCount);
-    } finally {
-      provider.cleanUp();
-    }
-  }
-
-  @Test
-  public void testConfigSetterGetter() {
-    YoutubeConfiguration config = new YoutubeConfiguration();
-    config.setApiKey("API_KEY");
-    config.setVersion("fake_version_1");
-    YoutubeConfiguration newConfig = new YoutubeConfiguration();
-    newConfig.setApiKey("API_KEY");
-    config.setVersion("fake_version_2");
-
-    YoutubeProvider provider = buildProvider(config);
-
-    assertEquals(provider.getConfig(), config);
-
-    provider.setConfig(newConfig);
-    assertEquals(provider.getConfig(), newConfig);
-  }
-
-  @Test
-  public void testUserInfoWithDefaultDates() {
-    YoutubeConfiguration config = new YoutubeConfiguration();
-    config.setApiKey("API_KEY");
-    YoutubeProvider provider = buildProvider(config);
-
-    DateTime afterDate = new DateTime(System.currentTimeMillis());
-    DateTime beforeDate = afterDate.minus(10000);
-
-    provider.setDefaultAfterDate(afterDate);
-    provider.setDefaultBeforeDate(beforeDate);
-
-    Set<String> users = new HashSet<>();
-    users.add("test_user_1");
-    users.add("test_user_2");
-    users.add("test_user_3");
-
-    provider.setUserInfoWithDefaultDates(users);
-
-    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
-
-    for (UserInfo user : youtubeUsers) {
-      assert (user.getAfterDate().equals(afterDate));
-      assert (user.getBeforeDate().equals(beforeDate));
-    }
-  }
-
-  @Test
-  public void testUserInfoWithAfterDate() {
-    YoutubeConfiguration config = new YoutubeConfiguration();
-    config.setApiKey("API_KEY");
-    YoutubeProvider provider = buildProvider(config);
-
-    Map<String, DateTime> users = new HashMap<>();
-    users.put("user1", new DateTime(System.currentTimeMillis()));
-    users.put("user3", new DateTime(System.currentTimeMillis()));
-    users.put("user4", new DateTime(System.currentTimeMillis()));
-
-    provider.setUserInfoWithAfterDate(users);
-
-    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
-
-    for (UserInfo user : youtubeUsers) {
-      assert (user.getAfterDate().equals(users.get(user.getUserId())));
-    }
-  }
-
-  private YoutubeProvider buildProvider(YoutubeConfiguration config) {
-    return new YoutubeProvider(config) {
-
-      @Override
-      protected YouTube createYouTubeClient() throws IOException {
-        return mock(YouTube.class);
-      }
-
-      @Override
-      protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
-        final BlockingQueue<StreamsDatum> q = queue;
-        return () -> {
-          try {
-            q.put(new StreamsDatum(null));
-          } catch (InterruptedException ie) {
-            fail("Test was interrupted");
-          }
-        };
-      }
-    };
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
deleted file mode 100644
index d6a540e..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.local.queues.ThroughputQueue;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-
-import com.google.api.services.youtube.YouTube;
-import com.google.api.services.youtube.model.Activity;
-import com.google.api.services.youtube.model.ActivityContentDetails;
-import com.google.api.services.youtube.model.ActivityContentDetailsUpload;
-import com.google.api.services.youtube.model.ActivityListResponse;
-import com.google.api.services.youtube.model.Video;
-import com.google.api.services.youtube.model.VideoListResponse;
-import com.google.api.services.youtube.model.VideoSnippet;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.joda.time.DateTime;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Test for YoutubeUserActivityCollector.
- */
-public class YoutubeUserActivityCollectorTest {
-  private static final String USER_ID = "fake_user_id";
-  private static final String IN_RANGE_IDENTIFIER = "data in range";
-  private YoutubeConfiguration config;
-
-  @Before
-  public void setup() {
-    this.config = new YoutubeConfiguration();
-    this.config.setApiKey("API_KEY");
-  }
-
-  @Test
-  public void testGetVideos() throws IOException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(0, 1, 0, now, now.minus(10000));
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    List<Video> video = collector.getVideoList("fake_video_id");
-
-    assertNotNull(video.get(0));
-  }
-
-  @Test
-  public void testGetVideosNull() throws IOException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(0, 0, 0, now.plus(10000), now.minus(10000));
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    List<Video> video = collector.getVideoList("fake_video_id");
-
-    assertEquals(video.size(), 0);
-  }
-
-  @Test
-  public void testProcessActivityFeed() throws IOException, InterruptedException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(0, 0, 5, now.plus(3000000), now.minus(1000000));
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    ActivityListResponse feed = buildActivityListResponse(1);
-
-    collector.processActivityFeed(feed, new DateTime(System.currentTimeMillis()), null);
-
-    assertEquals(collector.getDatumQueue().size(), 5);
-  }
-
-  @Test
-  public void testProcessActivityFeedBefore() throws IOException, InterruptedException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(5, 0, 0, now, now);
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    ActivityListResponse feed = buildActivityListResponse(1);
-
-    collector.processActivityFeed(feed, new DateTime(System.currentTimeMillis()), null);
-
-    assertEquals(collector.getDatumQueue().size(), 0);
-  }
-
-  @Test
-  public void testProcessActivityFeedAfter() throws IOException, InterruptedException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(0, 5, 0, now, now);
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    ActivityListResponse feed = buildActivityListResponse(1);
-
-    collector.processActivityFeed(feed, new DateTime(now.getMillis() - 100000), null);
-
-    assertEquals(collector.getDatumQueue().size(), 5);
-  }
-
-  @Test
-  public void testProcessActivityFeedMismatchCount() throws IOException, InterruptedException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    ActivityListResponse feed = buildActivityListResponse(1);
-
-    collector.processActivityFeed(feed, new DateTime(now), null);
-
-    assertEquals(collector.getDatumQueue().size(), 5);
-  }
-
-  @Test
-  public void testProcessActivityFeedMismatchCountInRange() throws IOException, InterruptedException {
-    DateTime now = new DateTime(System.currentTimeMillis());
-    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
-
-    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-    YoutubeUserActivityCollector collector = new YoutubeUserActivityCollector(youtube, datumQueue, new ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
-
-    ActivityListResponse feed = buildActivityListResponse(1);
-
-    collector.processActivityFeed(feed, new DateTime(now), new DateTime(now).minus(100000));
-
-    assertEquals(collector.getDatumQueue().size(), 5);
-  }
-
-  private ActivityListResponse buildActivityListResponse(int num) {
-    ActivityListResponse activityListResponse = new ActivityListResponse();
-    List<Activity> items = new ArrayList<>();
-
-    for ( int x = 0; x < num; x++ ) {
-      Activity activity = new Activity();
-
-      ActivityContentDetails contentDetails = new ActivityContentDetails();
-      ActivityContentDetailsUpload upload = new ActivityContentDetailsUpload();
-      upload.setVideoId("video_id_" + x);
-      contentDetails.setUpload(upload);
-
-      activity.setId("id_" + x);
-      activity.setContentDetails(contentDetails);
-
-      items.add(activity);
-    }
-
-    activityListResponse.setItems(items);
-
-    return activityListResponse;
-  }
-
-  private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int numInRange, DateTime afterDate, DateTime beforeDate) {
-
-    return createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate);
-
-  }
-
-  private YouTube createYoutubeMock(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before) {
-    YouTube youtube = mock(YouTube.class);
-
-    final YouTube.Videos videos = createMockVideos(numBefore, numAfter, numInRange, after, before);
-    doAnswer(invocationOnMock -> videos).when(youtube).videos();
-
-    return youtube;
-  }
-
-  private YouTube.Videos createMockVideos(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before) {
-    YouTube.Videos videos = mock(YouTube.Videos.class);
-
-    try {
-      YouTube.Videos.List list = createMockVideosList(numBefore, numAfter, numInRange, after, before);
-      when(videos.list(anyString())).thenReturn(list);
-    } catch (IOException ex) {
-      fail("Exception thrown while creating mock");
-    }
-
-    return videos;
-  }
-
-  private YouTube.Videos.List createMockVideosList(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before) {
-    YouTube.Videos.List list = mock(YouTube.Videos.List.class);
-
-    when(list.setMaxResults(anyLong())).thenReturn(list);
-    when(list.setPageToken(anyString())).thenReturn(list);
-    when(list.setId(anyString())).thenReturn(list);
-    when(list.setKey(anyString())).thenReturn(list);
-
-    VideoListResponseAnswer answer = new VideoListResponseAnswer(numBefore, numAfter, numInRange, after, before);
-    try {
-      doAnswer(answer).when(list).execute();
-    } catch (IOException ioe) {
-      fail("Should not have thrown exception while creating mock. : " + ioe.getMessage());
-    }
-
-    return list;
-  }
-
-  private static VideoListResponse createMockVideoListResponse(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before, boolean page) {
-    VideoListResponse feed = new VideoListResponse();
-    List<Video> list = new LinkedList<>();
-
-    for (int i = 0; i < numAfter; ++i) {
-      com.google.api.client.util.DateTime published = new com.google.api.client.util.DateTime(after.getMillis() + 1000000);
-      Video video = new Video();
-      video.setSnippet(new VideoSnippet());
-      video.getSnippet().setPublishedAt(published);
-      list.add(video);
-    }
-    for (int i = 0; i < numInRange; ++i) {
-      DateTime published;
-      if ((before == null && after == null) || before == null) {
-        published = DateTime.now(); // no date range or end time date range so just make the time now.
-      } else if (after == null) {
-        published = before.minusMillis(100000); //no beginning to range
-      } else { // has to be in range
-        long range = before.getMillis() - after.getMillis();
-        published = after.plus(range / 2); //in the middle
-      }
-      com.google.api.client.util.DateTime ytPublished = new com.google.api.client.util.DateTime(published.getMillis());
-      Video video = new Video();
-      video.setSnippet(new VideoSnippet());
-      video.getSnippet().setPublishedAt(ytPublished);
-      video.getSnippet().setTitle(IN_RANGE_IDENTIFIER);
-      list.add(video);
-    }
-    for (int i = 0; i < numBefore; ++i) {
-      com.google.api.client.util.DateTime published = new com.google.api.client.util.DateTime((after.minusMillis(100000)).getMillis());
-      Video video = new Video();
-      video.setSnippet(new VideoSnippet());
-      video.getSnippet().setPublishedAt(published);
-      list.add(video);
-    }
-    if (page) {
-      feed.setNextPageToken("A");
-    } else {
-      feed.setNextPageToken(null);
-    }
-
-    feed.setItems(list);
-
-    return feed;
-  }
-
-  private static class VideoListResponseAnswer implements Answer<VideoListResponse> {
-    private int afterCount = 0;
-    private int beforeCount = 0;
-    private int inCount = 0;
-    private int maxBatch = 100;
-
-    private int numAfter;
-    private int numInRange;
-    private int numBefore;
-    private DateTime after;
-    private DateTime before;
-
-    private VideoListResponseAnswer(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) {
-      this.numBefore = numBefore;
-      this.numAfter = numAfter;
-      this.numInRange = numInRange;
-      this.after = after;
-      this.before = before;
-    }
-
-    @Override
-    public VideoListResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
-      int totalCount = 0;
-      int batchAfter = 0;
-      int batchBefore = 0;
-      int batchIn = 0;
-      inCount = 0;
-      afterCount = 0;
-      beforeCount = 0;
-
-      if (afterCount != numAfter) {
-        if (numAfter - afterCount >= maxBatch) {
-          afterCount += maxBatch;
-          batchAfter += maxBatch;
-          totalCount += batchAfter;
-        } else {
-          batchAfter += numAfter - afterCount;
-          totalCount += numAfter - afterCount;
-          afterCount = numAfter;
-        }
-      }
-      if (totalCount < maxBatch && inCount != numInRange) {
-        if (numInRange - inCount >= maxBatch - totalCount) {
-          inCount += maxBatch - totalCount;
-          batchIn += maxBatch - totalCount;
-          totalCount += batchIn;
-        } else {
-          batchIn += numInRange - inCount;
-          totalCount += numInRange - inCount;
-          inCount = numInRange;
-        }
-      }
-      if (totalCount < maxBatch && beforeCount != numBefore) {
-        if (numBefore - batchBefore >= maxBatch - totalCount) {
-          batchBefore += maxBatch - totalCount;
-          totalCount = maxBatch;
-          beforeCount += batchBefore;
-        } else {
-          batchBefore += numBefore - beforeCount;
-          totalCount += numBefore - beforeCount;
-          beforeCount = numBefore;
-        }
-      }
-
-      return createMockVideoListResponse(batchBefore, batchAfter, batchIn, after, before, numAfter != afterCount || inCount != numInRange || beforeCount != numBefore);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
deleted file mode 100644
index 16565bb..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.api.services.youtube.model.Video;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class YoutubeEventClassifierTest {
-
-  private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-  private final String testObjectNode = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-
-  @Test
-  public void testVideoClassification() {
-    Class klass = YoutubeEventClassifier.detectClass(testVideo);
-
-    assertEquals(klass, Video.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testExceptionClassification() {
-    YoutubeEventClassifier.detectClass("");
-  }
-
-  @Test
-  public void testObjectNodeClassification() {
-    Class klass = YoutubeEventClassifier.detectClass(testObjectNode);
-
-    assertEquals(klass, ObjectNode.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
deleted file mode 100644
index 29afd19..0000000
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.youtube.model.Video;
-import org.joda.time.DateTime;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test for YoutubeVideoSerDe.
- */
-public class YoutubeVideoSerDeTest {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeVideoSerDeTest.class);
-  private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-  private ObjectMapper objectMapper;
-  private YoutubeActivityUtil youtubeActivityUtil;
-
-  /**
-   * setup for test.
-   */
-  @Before
-  public void setup() {
-    objectMapper = StreamsJacksonMapper.getInstance();
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
-    objectMapper.registerModule(simpleModule);
-    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-    youtubeActivityUtil = new YoutubeActivityUtil();
-  }
-
-  @Test
-  public void testVideoObject() {
-    LOGGER.info("raw: {}", testVideo);
-
-    try {
-      Activity activity = new Activity();
-
-      Video video = objectMapper.readValue(testVideo, Video.class);
-
-      youtubeActivityUtil.updateActivity(video, activity, "testChannelId");
-      LOGGER.info("activity: {}", activity);
-
-      assertNotNull(activity);
-      assert (activity.getId().contains("id:youtube:post"));
-      assertEquals(activity.getVerb(), "post");
-
-      Provider provider = activity.getProvider();
-      assertEquals(provider.getId(), "id:providers:youtube");
-      assertEquals(provider.getDisplayName(), "YouTube");
-
-      ActivityObject actor = activity.getActor();
-      assert (actor.getId().contains("id:youtube:"));
-      assertNotNull(actor.getDisplayName());
-      assertNotNull(actor.getSummary());
-
-      assertNotNull(activity.getTitle());
-      assertNotNull(activity.getUrl());
-      assertNotNull(activity.getContent());
-
-      assertEquals(activity.getPublished().getClass(), DateTime.class);
-
-      Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-
-      assertNotNull(extensions.get("youtube"));
-      assertNotNull(extensions.get("likes"));
-
-      assertTrue(testActivityObject(activity));
-    } catch (Exception ex) {
-      LOGGER.error("Exception while testing the Ser/De functionality of the Video deserializer: {}", ex);
-    }
-  }
-
-  private boolean testActivityObject(Activity activity) {
-    boolean valid = false;
-
-    ActivityObject obj = activity.getObject();
-
-    if ( obj.getObjectType().equals("video")
-        && !obj.getImage().equals(null)
-        && !obj.getUrl().equals("null")
-        && obj.getUrl().contains("https://www.youtube.com/watch?v=")) {
-      valid = true;
-    }
-
-    return valid;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/processor/YoutubeTypeConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/processor/YoutubeTypeConverterTest.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/processor/YoutubeTypeConverterTest.java
new file mode 100644
index 0000000..fc5a309
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/processor/YoutubeTypeConverterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.processor;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.youtube.serializer.YoutubeActivityUtil;
+import org.apache.streams.youtube.serializer.YoutubeVideoDeserializer;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.youtube.model.Video;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test for YoutubeTypeConverter.
+ */
+public class YoutubeTypeConverterTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeTypeConverterTest.class);
+  private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+
+  private YoutubeTypeConverter youtubeTypeConverter;
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup for test.
+   */
+  @Before
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+    youtubeTypeConverter = new YoutubeTypeConverter();
+    youtubeTypeConverter.prepare(null);
+  }
+
+  @Test
+  public void testVideoConversion() {
+    try {
+      LOGGER.info("raw: {}", testVideo);
+      Activity activity = new Activity();
+
+      Video video = objectMapper.readValue(testVideo, Video.class);
+      StreamsDatum streamsDatum = new StreamsDatum(video);
+
+      assertNotNull(streamsDatum.getDocument());
+
+      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
+      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
+
+      assertEquals(retList.size(), 1);
+      assert (retList.get(0).getDocument() instanceof Activity);
+      assertEquals(activity, retList.get(0).getDocument());
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to convert video to activity: {}", ex);
+    }
+  }
+
+  @Test
+  public void testStringVideoConversion() {
+    try {
+      LOGGER.info("raw: {}", testVideo);
+      Activity activity = new Activity();
+
+      Video video = objectMapper.readValue(testVideo, Video.class);
+      StreamsDatum streamsDatum = new StreamsDatum(testVideo);
+
+      assertNotNull(streamsDatum.getDocument());
+
+      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
+      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
+
+      assertEquals(retList.size(), 1);
+      assert (retList.get(0).getDocument() instanceof Activity);
+      assertEquals(activity, retList.get(0).getDocument());
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to convert video to activity: {}", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollectorTest.java
new file mode 100644
index 0000000..b291760
--- /dev/null
+++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollectorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.Channel;
+import com.google.api.services.youtube.model.ChannelListResponse;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * YoutubeChannelDataCollectorTest tests YoutubeChannelDataCollector.
+ */
+public class YoutubeChannelDataCollectorTest {
+
+  private static final String ID = "12345";
+
+  @Test
+  public void testDataCollector() throws Exception {
+    YouTube youTube = createMockYoutube();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
+    UserInfo userInfo = new UserInfo();
+    userInfo.setUserId(ID);
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey(ID);
+    YoutubeChannelDataCollector collector = new YoutubeChannelDataCollector(youTube, queue, strategy, userInfo, config);
+    collector.run();
+    assertEquals(1, queue.size());
+    StreamsDatum datum = queue.take();
+    assertNotNull(datum);
+    String document = (String) datum.getDocument();
+    assertNotNull(document);
+  }
+
+  private YouTube createMockYoutube() throws Exception {
+    YouTube mockYouTube = mock(YouTube.class);
+    YouTube.Channels channels = createMockChannels();
+    when(mockYouTube.channels()).thenReturn(channels);
+    return mockYouTube;
+  }
+
+  private YouTube.Channels createMockChannels() throws Exception {
+    YouTube.Channels mockChannels = mock(YouTube.Channels.class);
+    YouTube.Channels.List channelLists = createMockChannelsList();
+    when(mockChannels.list(anyString())).thenReturn(channelLists);
+    return mockChannels;
+  }
+
+  private YouTube.Channels.List createMockChannelsList() throws Exception {
+    YouTube.Channels.List mockList = mock(YouTube.Channels.List.class);
+    when(mockList.setId(anyString())).thenReturn(mockList);
+    when(mockList.setKey(anyString())).thenReturn(mockList);
+    ChannelListResponse response = createMockResponse();
+    when(mockList.execute()).thenReturn(response);
+    return mockList;
+  }
+
+  private ChannelListResponse createMockResponse() {
+    ChannelListResponse response = new ChannelListResponse();
+    List<Channel> channelList = new LinkedList<>();
+    response.setItems(channelList);
+    Channel channel = new Channel();
+    channel.setId(ID);
+    channelList.add(channel);
+    return response;
+  }
+
+}