You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/30 16:58:06 UTC

[1/2] git commit: Isolated to new package.

Repository: incubator-streams
Updated Branches:
  refs/heads/master be58524bf -> e6ffe29e8


Isolated to new package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/40ab1593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/40ab1593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/40ab1593

Branch: refs/heads/master
Commit: 40ab15939465f8ceafd1681fff23ebd2f01e35f6
Parents: be58524
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 29 11:44:57 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 29 11:44:57 2014 -0400

----------------------------------------------------------------------
 .../streams/sysomos/AbstractRequestBuilder.java |  57 ------
 .../streams/sysomos/ContentRequestBuilder.java  | 134 -------------
 .../apache/streams/sysomos/RequestBuilder.java  |  96 ----------
 .../apache/streams/sysomos/SysomosClient.java   |  63 ------
 .../streams/sysomos/SysomosHeartbeatStream.java | 127 ------------
 .../apache/streams/sysomos/SysomosProvider.java | 191 -------------------
 .../provider/AbstractRequestBuilder.java        |  58 ++++++
 .../sysomos/provider/ContentRequestBuilder.java | 135 +++++++++++++
 .../sysomos/provider/RequestBuilder.java        |  96 ++++++++++
 .../streams/sysomos/provider/SysomosClient.java |  55 ++++++
 .../provider/SysomosHeartbeatStream.java        | 127 ++++++++++++
 .../sysomos/provider/SysomosProvider.java       | 191 +++++++++++++++++++
 12 files changed, 662 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
deleted file mode 100644
index 0699a1a..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import com.sysomos.xml.ObjectFactory;
-import org.apache.streams.sysomos.util.SysomosUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.net.URL;
-
-/**
- * Defines a common pattern for requesting data from the Sysomos API.
- */
-public abstract class AbstractRequestBuilder implements RequestBuilder {
-    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
-
-    /**
-     * Executes the request to the Sysomos Heartbeat API and returns a valid response
-     */
-    public BeatApi.BeatResponse execute() {
-        URL url = this.getRequestUrl();
-        try {
-            String xmlResponse = SysomosUtils.queryUrl(url);
-            JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
-            Unmarshaller unmarshaller = context.createUnmarshaller();
-            BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
-            return beatApi.getBeatResponse();
-        } catch (JAXBException e) {
-            LOGGER.error("Unable to unmarshal XML content");
-            throw new SysomosException("Unable to unmarshal XML content", e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
deleted file mode 100644
index d632ad6..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.sysomos;
-
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-import static org.apache.streams.sysomos.util.SysomosUtils.*;
-
-/**
- * Builds requests for the Sysomos Heartbeat Content API.  This is the preferred method of
- * accessing data from Sysomoos Heartbeat
- */
-public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
-
-    private String baseUrl;
-    private String hid;
-    private String addedAfter;
-    private String addedBefore;
-    private String size;
-    private String offset;
-    private String apiKey;
-
-    /**
-     * The max number of items you are allowed to get per request.
-     */
-    public static final int MAX_ALLOWED_PER_REQUEST = 10000;
-
-    /**
-     * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL
-     * @param baseUrl the base URL for the sysomos API
-     * @param apiKey the API key generated by Sysomos for authorization
-     */
-    protected ContentRequestBuilder(String baseUrl, String apiKey) {
-        this.baseUrl = baseUrl;
-        this.apiKey = apiKey;
-    }
-
-    /**
-     * Gets the Request URL based on the local fields
-     * @return a valid URL for the Sysomos API or an exception
-     */
-    @Override
-    public URL getRequestUrl()  {
-        StringBuilder url = new StringBuilder();
-        url.append(this.baseUrl);
-        url.append("dev/v1/heartbeat/content?");
-        url.append("apiKey=");
-        url.append(this.apiKey);
-        url.append("&hid=");
-        url.append(this.hid);
-        if (size != null) {
-            url.append("&size=");
-            url.append(this.size);
-        }
-        if (this.addedAfter != null) {
-            url.append("&addedAfter=");
-            url.append(this.addedAfter);
-        }
-        if (this.addedBefore != null) {
-            url.append("&addedBefore=");
-            url.append(this.addedBefore);
-        }
-        if (this.offset != null) {
-            url.append("&offset=");
-            url.append(this.offset);
-        }
-        String urlString = url.toString();
-        LOGGER.debug("Constructed Request URL: {}", urlString);
-        try {
-            return new URL(urlString);
-        } catch (MalformedURLException e) {
-            throw new SysomosException("Malformed Request URL.  Check Request Builder parameters", e);
-        }
-    }
-
-    @Override
-    public RequestBuilder setHeartBeatId(int hid) {
-        return setHeartBeatId(Integer.toString(hid));
-    }
-
-    @Override
-    public RequestBuilder setHeartBeatId(String hid) {
-        this.hid = hid;
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setAddedAfterDate(DateTime afterDate) {
-        this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
-        this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setReturnSetSize(long size) {
-        this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setOffset(long offset) {
-        this.offset = Long.toString(offset);
-        return this;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
deleted file mode 100644
index 8c31d88..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import org.joda.time.DateTime;
-
-import java.net.URL;
-
-/**
- * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion
- */
-public interface RequestBuilder {
-    /**
-     * Sets the date after which documents should be returned from Sysomos
-     * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setAddedAfterDate(DateTime afterDate);
-
-    /**
-     * Sets the date before which documents should be returned from Sysomos
-     * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setAddedBeforeDate(DateTime beforeDate);
-
-    /**
-     * Sets the size of the expected response
-     * @param size the number of documents
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setReturnSetSize(long size);
-
-    /**
-     * Sets the starting offset for the number of documents given the other parameters
-     * @param offset the starting offset
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setOffset(long offset);
-
-    /**
-     * Sets the Sysomos Heartbeat ID
-     * @param hid Heartbeat ID
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setHeartBeatId(int hid);
-
-    /**
-     *
-     * Sets the Sysomos Heartbeat ID as a String
-     * @param hid Heartbeat ID string
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setHeartBeatId(String hid);
-
-    /**
-     * Returns the full url need to execute a request.
-     *
-     * Example:
-     * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
-     * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
-     * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
-     *
-     * @return the URL to use when making requests of Sysomos Heartbeat
-     */
-    URL getRequestUrl();
-
-    /**
-     * Executes the request to the Sysomos Heartbeat API and returns a valid response
-     */
-    BeatApi.BeatResponse execute();
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
deleted file mode 100644
index 2fe082a..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.google.common.base.Strings;
-import com.sysomos.json.Sysomos;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.sysomos.data.HeartbeatInfo;
-import org.apache.streams.sysomos.util.SysomosUtils;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Wrapper for the Sysomos API.
- */
-public class SysomosClient {
-
-    public static final String BASE_URL_STRING = "http://api.sysomos.com/";
-    private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
-
-    private String apiKey;
-
-    private HttpURLConnection client;
-
-    public SysomosClient(String apiKey) {
-        this.apiKey = apiKey;
-    }
-
-    public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception {
-        String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey);
-        urlString = urlString.replace("{hid}", hid);
-        String xmlResponse = SysomosUtils.queryUrl(new URL(urlString));
-        return new HeartbeatInfo(xmlResponse);
-    }
-
-    public RequestBuilder createRequestBuilder() {
-        return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
deleted file mode 100644
index e552ecd..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
- */
-public class SysomosHeartbeatStream implements Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
-
-    private final SysomosProvider provider;
-    private final SysomosClient client;
-    private final String heartbeatId;
-    private final long maxApiBatch;
-    private final long minLatency;
-
-    private String lastID = null;
-
-    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
-        this.provider = provider;
-        this.heartbeatId = heartbeatId;
-
-        this.client = provider.getClient();
-        this.maxApiBatch = provider.getMaxApiBatch();
-        this.minLatency = provider.getMinLatency();
-    }
-
-    @Override
-    public void run() {
-        QueryResult result;
-        //Iff we are trying to get to a specific document ID, continue to query after minimum delay
-        do {
-            LOGGER.debug("Querying API to match last ID of {}", lastID);
-            result = executeAPIRequest();
-            sleep();
-        } while (lastID != null && !result.isMatchedLastId());
-        //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
-        //found the specific ID
-        lastID = result.getCurrentId();
-        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
-    }
-
-    protected void sleep() {
-        try {
-            Thread.sleep(this.minLatency);
-        } catch (InterruptedException e) {
-            LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
-        }
-    }
-
-    protected QueryResult executeAPIRequest() {
-        BeatApi.BeatResponse response = this.client.createRequestBuilder()
-                .setHeartBeatId(heartbeatId)
-                .setOffset(0)
-                .setReturnSetSize(maxApiBatch).execute();
-
-        LOGGER.debug("Received {} results from API query", response.getCount());
-
-        String currentId = null;
-        boolean matched = false;
-        for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
-            String docId = beat.getDocid();
-            //We get documents in descending time order.  This will set the id to the latest document
-            if(currentId == null) {
-                currentId = docId;
-            }
-            //We only want to process documents that we know we have not seen before
-            if(lastID != null && lastID.equals(docId)) {
-                matched = true;
-                break;
-            }
-            StreamsDatum item = new StreamsDatum(beat, docId);
-            item.getMetadata().put("heartbeat", this.heartbeatId);
-            this.provider.enqueueItem(item);
-        }
-        return new QueryResult(matched, currentId);
-    }
-
-    private class QueryResult {
-        private boolean matchedLastId;
-        private String currentId;
-
-        private QueryResult(boolean matchedLastId, String currentId) {
-            this.matchedLastId = matchedLastId;
-            this.currentId = currentId;
-        }
-
-        public boolean isMatchedLastId() {
-            return matchedLastId;
-        }
-
-        public void setMatchedLastId(boolean matchedLastId) {
-            this.matchedLastId = matchedLastId;
-        }
-
-        public String getCurrentId() {
-            return currentId;
-        }
-
-        public void setCurrentId(String currentId) {
-            this.currentId = currentId;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
deleted file mode 100644
index 9f12cc7..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.google.common.collect.Queues;
-import com.sysomos.SysomosConfiguration;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Streams Provider for the Sysomos Heartbeat API
- */
-public class SysomosProvider implements StreamsProvider {
-
-    public final static String STREAMS_ID = "SysomosProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
-
-    public static final int LATENCY = 10000;  //Default minLatency for querying the Sysomos API in milliseconds
-    public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
-    public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
-
-    protected volatile Queue<StreamsDatum> providerQueue;
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final long maxQueued;
-    private final long minLatency;
-    private final long scheduledLatency;
-    private final long maxApiBatch;
-
-    private SysomosClient client;
-    private SysomosConfiguration config;
-    private ScheduledExecutorService stream;
-    private boolean started = false;
-
-    public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
-        this.config = sysomosConfiguration;
-        this.client = new SysomosClient(sysomosConfiguration.getApiKey());
-        this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
-        this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
-        this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
-        this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
-    }
-
-    public SysomosConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(SysomosConfiguration config) {
-        this.config = config;
-    }
-
-    @Override
-    public void startStream() {
-        LOGGER.trace("Starting Producer");
-        if (!started) {
-            LOGGER.trace("Producer not started.  Initializing");
-            stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
-            for (String heartbeatId : getConfig().getHeartbeatIds()) {
-                Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
-                stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
-                LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
-            }
-            started = true;
-        }
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        StreamsResultSet current;
-        try {
-            lock.writeLock().lock();
-            LOGGER.debug("Creating new result set for {} items", providerQueue.size());
-            current = new StreamsResultSet(providerQueue);
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        return current;
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger bigInteger) {
-        throw new NotImplementedException("readNew not currently implemented");
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        throw new NotImplementedException("readRange not currently implemented");
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.providerQueue = constructQueue();
-    }
-
-    @Override
-    public void cleanUp() {
-        stream.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
-                stream.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
-                    LOGGER.error("Stream did not terminate");
-                }
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            stream.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    public long getMinLatency() {
-        return minLatency;
-    }
-
-    public long getMaxApiBatch() {
-        return maxApiBatch;
-    }
-
-    public SysomosClient getClient() {
-        return client;
-    }
-
-    protected void enqueueItem(StreamsDatum datum) {
-        boolean success;
-        do {
-            try {
-                pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
-                lock.readLock().lock();
-                success = providerQueue.offer(datum);
-                Thread.yield();
-            }finally {
-                lock.readLock().unlock();
-            }
-        }
-        while (!success);
-    }
-
-    /**
-     * Wait for the queue size to be below threshold before allowing execution to continue on this thread
-     */
-    private void pauseForSpace() {
-        while(this.providerQueue.size() >= maxQueued) {
-            LOGGER.trace("Sleeping the current thread due to a full queue");
-            try {
-                Thread.sleep(100);
-                LOGGER.trace("Resuming thread after wait period");
-            } catch (InterruptedException e) {
-                LOGGER.warn("Thread was interrupted", e);
-            }
-        }
-    }
-
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.newConcurrentLinkedQueue();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
new file mode 100644
index 0000000..28f33df
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import com.sysomos.xml.ObjectFactory;
+import org.apache.streams.sysomos.SysomosException;
+import org.apache.streams.sysomos.util.SysomosUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.net.URL;
+
+/**
+ * Defines a common pattern for requesting data from the Sysomos API.
+ */
+public abstract class AbstractRequestBuilder implements RequestBuilder {
+    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
+
+    /**
+     * Executes the request to the Sysomos Heartbeat API and returns a valid response
+     */
+    public BeatApi.BeatResponse execute() {
+        URL url = this.getRequestUrl();
+        try {
+            String xmlResponse = SysomosUtils.queryUrl(url);
+            JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
+            Unmarshaller unmarshaller = context.createUnmarshaller();
+            BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
+            return beatApi.getBeatResponse();
+        } catch (JAXBException e) {
+            LOGGER.error("Unable to unmarshal XML content");
+            throw new SysomosException("Unable to unmarshal XML content", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
new file mode 100644
index 0000000..79c7f12
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.sysomos.provider;
+
+import org.apache.streams.sysomos.SysomosException;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static org.apache.streams.sysomos.util.SysomosUtils.*;
+
+/**
+ * Builds requests for the Sysomos Heartbeat Content API.  This is the preferred method of
+ * accessing data from Sysomoos Heartbeat
+ */
+public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
+
+    private String baseUrl;
+    private String hid;
+    private String addedAfter;
+    private String addedBefore;
+    private String size;
+    private String offset;
+    private String apiKey;
+
+    /**
+     * The max number of items you are allowed to get per request.
+     */
+    public static final int MAX_ALLOWED_PER_REQUEST = 10000;
+
+    /**
+     * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL
+     * @param baseUrl the base URL for the sysomos API
+     * @param apiKey the API key generated by Sysomos for authorization
+     */
+    protected ContentRequestBuilder(String baseUrl, String apiKey) {
+        this.baseUrl = baseUrl;
+        this.apiKey = apiKey;
+    }
+
+    /**
+     * Gets the Request URL based on the local fields
+     * @return a valid URL for the Sysomos API or an exception
+     */
+    @Override
+    public URL getRequestUrl()  {
+        StringBuilder url = new StringBuilder();
+        url.append(this.baseUrl);
+        url.append("dev/v1/heartbeat/content?");
+        url.append("apiKey=");
+        url.append(this.apiKey);
+        url.append("&hid=");
+        url.append(this.hid);
+        if (size != null) {
+            url.append("&size=");
+            url.append(this.size);
+        }
+        if (this.addedAfter != null) {
+            url.append("&addedAfter=");
+            url.append(this.addedAfter);
+        }
+        if (this.addedBefore != null) {
+            url.append("&addedBefore=");
+            url.append(this.addedBefore);
+        }
+        if (this.offset != null) {
+            url.append("&offset=");
+            url.append(this.offset);
+        }
+        String urlString = url.toString();
+        LOGGER.debug("Constructed Request URL: {}", urlString);
+        try {
+            return new URL(urlString);
+        } catch (MalformedURLException e) {
+            throw new SysomosException("Malformed Request URL.  Check Request Builder parameters", e);
+        }
+    }
+
+    @Override
+    public RequestBuilder setHeartBeatId(int hid) {
+        return setHeartBeatId(Integer.toString(hid));
+    }
+
+    @Override
+    public RequestBuilder setHeartBeatId(String hid) {
+        this.hid = hid;
+        return this;
+    }
+
+    @Override
+    public RequestBuilder setAddedAfterDate(DateTime afterDate) {
+        this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
+        return this;
+    }
+
+    @Override
+    public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
+        this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
+        return this;
+    }
+
+    @Override
+    public RequestBuilder setReturnSetSize(long size) {
+        this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
+        return this;
+    }
+
+    @Override
+    public RequestBuilder setOffset(long offset) {
+        this.offset = Long.toString(offset);
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
new file mode 100644
index 0000000..0e12025
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import org.joda.time.DateTime;
+
+import java.net.URL;
+
+/**
+ * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion
+ */
+public interface RequestBuilder {
+    /**
+     * Sets the date after which documents should be returned from Sysomos
+     * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setAddedAfterDate(DateTime afterDate);
+
+    /**
+     * Sets the date before which documents should be returned from Sysomos
+     * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setAddedBeforeDate(DateTime beforeDate);
+
+    /**
+     * Sets the size of the expected response
+     * @param size the number of documents
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setReturnSetSize(long size);
+
+    /**
+     * Sets the starting offset for the number of documents given the other parameters
+     * @param offset the starting offset
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setOffset(long offset);
+
+    /**
+     * Sets the Sysomos Heartbeat ID
+     * @param hid Heartbeat ID
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setHeartBeatId(int hid);
+
+    /**
+     *
+     * Sets the Sysomos Heartbeat ID as a String
+     * @param hid Heartbeat ID string
+     *
+     * @return The RequestBuilder for continued Chaining
+     */
+    RequestBuilder setHeartBeatId(String hid);
+
+    /**
+     * Returns the full url need to execute a request.
+     *
+     * Example:
+     * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
+     * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
+     * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
+     *
+     * @return the URL to use when making requests of Sysomos Heartbeat
+     */
+    URL getRequestUrl();
+
+    /**
+     * Executes the request to the Sysomos Heartbeat API and returns a valid response
+     */
+    BeatApi.BeatResponse execute();
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
new file mode 100644
index 0000000..488b6c7
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import org.apache.streams.sysomos.data.HeartbeatInfo;
+import org.apache.streams.sysomos.util.SysomosUtils;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Wrapper for the Sysomos API.
+ */
+public class SysomosClient {
+
+    public static final String BASE_URL_STRING = "http://api.sysomos.com/";
+    private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
+
+    private String apiKey;
+
+    private HttpURLConnection client;
+
+    public SysomosClient(String apiKey) {
+        this.apiKey = apiKey;
+    }
+
+    public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception {
+        String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey);
+        urlString = urlString.replace("{hid}", hid);
+        String xmlResponse = SysomosUtils.queryUrl(new URL(urlString));
+        return new HeartbeatInfo(xmlResponse);
+    }
+
+    public RequestBuilder createRequestBuilder() {
+        return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
new file mode 100644
index 0000000..c5145fb
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
+ */
+public class SysomosHeartbeatStream implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
+
+    private final SysomosProvider provider;
+    private final SysomosClient client;
+    private final String heartbeatId;
+    private final long maxApiBatch;
+    private final long minLatency;
+
+    private String lastID = null;
+
+    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
+        this.provider = provider;
+        this.heartbeatId = heartbeatId;
+
+        this.client = provider.getClient();
+        this.maxApiBatch = provider.getMaxApiBatch();
+        this.minLatency = provider.getMinLatency();
+    }
+
+    @Override
+    public void run() {
+        QueryResult result;
+        //Iff we are trying to get to a specific document ID, continue to query after minimum delay
+        do {
+            LOGGER.debug("Querying API to match last ID of {}", lastID);
+            result = executeAPIRequest();
+            sleep();
+        } while (lastID != null && !result.isMatchedLastId());
+        //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+        //found the specific ID
+        lastID = result.getCurrentId();
+        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+    }
+
+    protected void sleep() {
+        try {
+            Thread.sleep(this.minLatency);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+        }
+    }
+
+    protected QueryResult executeAPIRequest() {
+        BeatApi.BeatResponse response = this.client.createRequestBuilder()
+                .setHeartBeatId(heartbeatId)
+                .setOffset(0)
+                .setReturnSetSize(maxApiBatch).execute();
+
+        LOGGER.debug("Received {} results from API query", response.getCount());
+
+        String currentId = null;
+        boolean matched = false;
+        for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
+            String docId = beat.getDocid();
+            //We get documents in descending time order.  This will set the id to the latest document
+            if(currentId == null) {
+                currentId = docId;
+            }
+            //We only want to process documents that we know we have not seen before
+            if(lastID != null && lastID.equals(docId)) {
+                matched = true;
+                break;
+            }
+            StreamsDatum item = new StreamsDatum(beat, docId);
+            item.getMetadata().put("heartbeat", this.heartbeatId);
+            this.provider.enqueueItem(item);
+        }
+        return new QueryResult(matched, currentId);
+    }
+
+    private class QueryResult {
+        private boolean matchedLastId;
+        private String currentId;
+
+        private QueryResult(boolean matchedLastId, String currentId) {
+            this.matchedLastId = matchedLastId;
+            this.currentId = currentId;
+        }
+
+        public boolean isMatchedLastId() {
+            return matchedLastId;
+        }
+
+        public void setMatchedLastId(boolean matchedLastId) {
+            this.matchedLastId = matchedLastId;
+        }
+
+        public String getCurrentId() {
+            return currentId;
+        }
+
+        public void setCurrentId(String currentId) {
+            this.currentId = currentId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
new file mode 100644
index 0000000..eaf9eac
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.google.common.collect.Queues;
+import com.sysomos.SysomosConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Streams Provider for the Sysomos Heartbeat API
+ */
+public class SysomosProvider implements StreamsProvider {
+
+    public final static String STREAMS_ID = "SysomosProvider";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+
+    public static final int LATENCY = 10000;  //Default minLatency for querying the Sysomos API in milliseconds
+    public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
+    public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
+
+    protected volatile Queue<StreamsDatum> providerQueue;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final long maxQueued;
+    private final long minLatency;
+    private final long scheduledLatency;
+    private final long maxApiBatch;
+
+    private SysomosClient client;
+    private SysomosConfiguration config;
+    private ScheduledExecutorService stream;
+    private boolean started = false;
+
+    public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
+        this.config = sysomosConfiguration;
+        this.client = new SysomosClient(sysomosConfiguration.getApiKey());
+        this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
+        this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
+        this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
+        this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
+    }
+
+    public SysomosConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(SysomosConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public void startStream() {
+        LOGGER.trace("Starting Producer");
+        if (!started) {
+            LOGGER.trace("Producer not started.  Initializing");
+            stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
+            for (String heartbeatId : getConfig().getHeartbeatIds()) {
+                Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
+                stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
+                LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
+            }
+            started = true;
+        }
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        StreamsResultSet current;
+        try {
+            lock.writeLock().lock();
+            LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+            current = new StreamsResultSet(providerQueue);
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return current;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger bigInteger) {
+        throw new NotImplementedException("readNew not currently implemented");
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+        throw new NotImplementedException("readRange not currently implemented");
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.providerQueue = constructQueue();
+    }
+
+    @Override
+    public void cleanUp() {
+        stream.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+                stream.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+                    LOGGER.error("Stream did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            stream.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public long getMinLatency() {
+        return minLatency;
+    }
+
+    public long getMaxApiBatch() {
+        return maxApiBatch;
+    }
+
+    public SysomosClient getClient() {
+        return client;
+    }
+
+    protected void enqueueItem(StreamsDatum datum) {
+        boolean success;
+        do {
+            try {
+                pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
+                lock.readLock().lock();
+                success = providerQueue.offer(datum);
+                Thread.yield();
+            }finally {
+                lock.readLock().unlock();
+            }
+        }
+        while (!success);
+    }
+
+    /**
+     * Wait for the queue size to be below threshold before allowing execution to continue on this thread
+     */
+    private void pauseForSpace() {
+        while(this.providerQueue.size() >= maxQueued) {
+            LOGGER.trace("Sleeping the current thread due to a full queue");
+            try {
+                Thread.sleep(100);
+                LOGGER.trace("Resuming thread after wait period");
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread was interrupted", e);
+            }
+        }
+    }
+
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.newConcurrentLinkedQueue();
+    }
+}


[2/2] git commit: Added new converter

Posted by mf...@apache.org.
Added new converter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6ffe29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6ffe29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6ffe29e

Branch: refs/heads/master
Commit: e6ffe29e8017592f6df881c8f8efc546041536be
Parents: 40ab159
Author: mfranklin <mf...@apache.org>
Authored: Wed Apr 30 10:56:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Apr 30 10:56:02 2014 -0400

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistWriter.java       |   3 +-
 .../SysomosBeatActivityConverter.java           | 137 +++++++++++++++++++
 .../sysomos/proessor/SysomosTypeConverter.java  |  56 ++++++++
 .../apache/streams/data/ActivitySerializer.java |   1 +
 4 files changed, 196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index a4a089a..80541e8 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -14,6 +14,7 @@ import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
     private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
     private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
new file mode 100644
index 0000000..e084946
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.conversion;
+
+import com.google.common.collect.Maps;
+import com.sysomos.xml.BeatApi;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Converts an instance of a {@link com.sysomos.xml.BeatApi.BeatResponse.Beat} to an {@link org.apache.streams.pojo.json.Activity}
+ */
+public class SysomosBeatActivityConverter {
+
+    public static final String LANGUAGE_KEY = "LANGUAGE";
+
+    public Activity convert(BeatApi.BeatResponse.Beat beat) {
+        Activity converted = new Activity();
+        converted.setId(beat.getDocid());
+        converted.setVerb("posted");
+        converted.setContent(beat.getContent());
+        converted.setTitle(beat.getTitle());
+        converted.setPublished(new DateTime(beat.getTime()));
+        converted.setUrl(beat.getLink());
+        converted.setActor(new Actor());
+        Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
+        Map<String, Object> extensions = ensureExtensions(converted);
+        setLocation(beat, extensions);
+        setObject(beat, converted);
+        setProvider(beat, converted);
+        setLanguage(mappedTags, extensions);
+        extensions.put("sysomos", beat);
+
+        setChannelSpecificValues(beat, converted, mappedTags);
+
+        return converted;
+    }
+
+    protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
+        String mediaType = beat.getMediaType();
+        String lowerMediaType = mediaType.toLowerCase();
+        Actor actor = converted.getActor();
+        ActivityObject object = converted.getObject();
+        if ("TWITTER".equals(mediaType)) {
+            actor.setId(getPersonId(lowerMediaType, beat.getHost()));
+            actor.setDisplayName(beat.getHost());
+            actor.setUrl("http://twitter.com/" + beat.getHost());
+            object.setObjectType("tweet");
+            object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
+        } else if ("FACEBOOK".equals(mediaType)) {
+            actor.setId(getPersonId(lowerMediaType, mappedTags.get("FBID").getValue()));
+            actor.setDisplayName(beat.getTitle());
+            actor.setUrl(beat.getHost());
+            object.setObjectType("post");
+            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+        } else {
+            actor.setId(null);
+            actor.setDisplayName(null);
+            actor.setUrl(null);
+            object.setObjectType("post");
+            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+        }
+    }
+
+    protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
+        if(mappedTags.containsKey(LANGUAGE_KEY)) {
+            extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
+        }
+    }
+
+    protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
+        ActivityObject object = new ActivityObject();
+        converted.setObject(object);
+        object.setUrl(beat.getLink());
+        object.setContent(beat.getContent());
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
+        Map<String, Object> location;
+        String country = beat.getLocation().getCountry();
+        if(StringUtils.isNotBlank(country)) {
+            if (extensions.containsKey(LOCATION_EXTENSION)) {
+                location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
+            } else {
+                location = Maps.newHashMap();
+                extensions.put(LOCATION_EXTENSION, location);
+            }
+            location.put(LOCATION_EXTENSION_COUNTRY, country);
+        }
+    }
+
+    protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
+        Provider provider = new Provider();
+        String mediaType = beat.getMediaType().toLowerCase();
+        provider.setId(getProviderId(mediaType));
+        provider.setDisplayName(StringUtils.capitalize(mediaType));
+        converted.setProvider(provider);
+    }
+
+    protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
+        Map<String, BeatApi.BeatResponse.Beat.Tag> tags = Maps.newHashMap();
+        for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
+            if(tag.getSystemType() != null) {
+                tags.put(tag.getSystemType(), tag);
+            }
+        }
+        return tags;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
new file mode 100644
index 0000000..187d402
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+    private SysomosBeatActivityConverter converter;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+            entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+            return Lists.newArrayList(entry);
+        } else {
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        converter = new SysomosBeatActivityConverter();
+    }
+
+    @Override
+    public void cleanUp() {
+        //NOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
index ad3809f..23903e5 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
@@ -23,6 +23,7 @@ import org.apache.streams.pojo.json.Activity;
 
 import java.util.List;
 
+//TODO:  Change the name of this class to ActivityConverter  STREAMS-68
 /**
  * Serializes and deserializes Activities
  */