You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/30 16:58:06 UTC
[1/2] git commit: Isolated to new package.
Repository: incubator-streams
Updated Branches:
refs/heads/master be58524bf -> e6ffe29e8
Isolated to new package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/40ab1593
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/40ab1593
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/40ab1593
Branch: refs/heads/master
Commit: 40ab15939465f8ceafd1681fff23ebd2f01e35f6
Parents: be58524
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 29 11:44:57 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 29 11:44:57 2014 -0400
----------------------------------------------------------------------
.../streams/sysomos/AbstractRequestBuilder.java | 57 ------
.../streams/sysomos/ContentRequestBuilder.java | 134 -------------
.../apache/streams/sysomos/RequestBuilder.java | 96 ----------
.../apache/streams/sysomos/SysomosClient.java | 63 ------
.../streams/sysomos/SysomosHeartbeatStream.java | 127 ------------
.../apache/streams/sysomos/SysomosProvider.java | 191 -------------------
.../provider/AbstractRequestBuilder.java | 58 ++++++
.../sysomos/provider/ContentRequestBuilder.java | 135 +++++++++++++
.../sysomos/provider/RequestBuilder.java | 96 ++++++++++
.../streams/sysomos/provider/SysomosClient.java | 55 ++++++
.../provider/SysomosHeartbeatStream.java | 127 ++++++++++++
.../sysomos/provider/SysomosProvider.java | 191 +++++++++++++++++++
12 files changed, 662 insertions(+), 668 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
deleted file mode 100644
index 0699a1a..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/AbstractRequestBuilder.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import com.sysomos.xml.ObjectFactory;
-import org.apache.streams.sysomos.util.SysomosUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.net.URL;
-
-/**
- * Defines a common pattern for requesting data from the Sysomos API.
- */
-public abstract class AbstractRequestBuilder implements RequestBuilder {
- private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
-
- /**
- * Executes the request to the Sysomos Heartbeat API and returns a valid response
- */
- public BeatApi.BeatResponse execute() {
- URL url = this.getRequestUrl();
- try {
- String xmlResponse = SysomosUtils.queryUrl(url);
- JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
- Unmarshaller unmarshaller = context.createUnmarshaller();
- BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
- return beatApi.getBeatResponse();
- } catch (JAXBException e) {
- LOGGER.error("Unable to unmarshal XML content");
- throw new SysomosException("Unable to unmarshal XML content", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
deleted file mode 100644
index d632ad6..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/ContentRequestBuilder.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.sysomos;
-
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-import static org.apache.streams.sysomos.util.SysomosUtils.*;
-
-/**
- * Builds requests for the Sysomos Heartbeat Content API. This is the preferred method of
- * accessing data from Sysomoos Heartbeat
- */
-public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
-
- private String baseUrl;
- private String hid;
- private String addedAfter;
- private String addedBefore;
- private String size;
- private String offset;
- private String apiKey;
-
- /**
- * The max number of items you are allowed to get per request.
- */
- public static final int MAX_ALLOWED_PER_REQUEST = 10000;
-
- /**
- * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL
- * @param baseUrl the base URL for the sysomos API
- * @param apiKey the API key generated by Sysomos for authorization
- */
- protected ContentRequestBuilder(String baseUrl, String apiKey) {
- this.baseUrl = baseUrl;
- this.apiKey = apiKey;
- }
-
- /**
- * Gets the Request URL based on the local fields
- * @return a valid URL for the Sysomos API or an exception
- */
- @Override
- public URL getRequestUrl() {
- StringBuilder url = new StringBuilder();
- url.append(this.baseUrl);
- url.append("dev/v1/heartbeat/content?");
- url.append("apiKey=");
- url.append(this.apiKey);
- url.append("&hid=");
- url.append(this.hid);
- if (size != null) {
- url.append("&size=");
- url.append(this.size);
- }
- if (this.addedAfter != null) {
- url.append("&addedAfter=");
- url.append(this.addedAfter);
- }
- if (this.addedBefore != null) {
- url.append("&addedBefore=");
- url.append(this.addedBefore);
- }
- if (this.offset != null) {
- url.append("&offset=");
- url.append(this.offset);
- }
- String urlString = url.toString();
- LOGGER.debug("Constructed Request URL: {}", urlString);
- try {
- return new URL(urlString);
- } catch (MalformedURLException e) {
- throw new SysomosException("Malformed Request URL. Check Request Builder parameters", e);
- }
- }
-
- @Override
- public RequestBuilder setHeartBeatId(int hid) {
- return setHeartBeatId(Integer.toString(hid));
- }
-
- @Override
- public RequestBuilder setHeartBeatId(String hid) {
- this.hid = hid;
- return this;
- }
-
- @Override
- public RequestBuilder setAddedAfterDate(DateTime afterDate) {
- this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
- return this;
- }
-
- @Override
- public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
- this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
- return this;
- }
-
- @Override
- public RequestBuilder setReturnSetSize(long size) {
- this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
- return this;
- }
-
- @Override
- public RequestBuilder setOffset(long offset) {
- this.offset = Long.toString(offset);
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
deleted file mode 100644
index 8c31d88..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/RequestBuilder.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import org.joda.time.DateTime;
-
-import java.net.URL;
-
-/**
- * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion
- */
-public interface RequestBuilder {
- /**
- * Sets the date after which documents should be returned from Sysomos
- * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setAddedAfterDate(DateTime afterDate);
-
- /**
- * Sets the date before which documents should be returned from Sysomos
- * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setAddedBeforeDate(DateTime beforeDate);
-
- /**
- * Sets the size of the expected response
- * @param size the number of documents
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setReturnSetSize(long size);
-
- /**
- * Sets the starting offset for the number of documents given the other parameters
- * @param offset the starting offset
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setOffset(long offset);
-
- /**
- * Sets the Sysomos Heartbeat ID
- * @param hid Heartbeat ID
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setHeartBeatId(int hid);
-
- /**
- *
- * Sets the Sysomos Heartbeat ID as a String
- * @param hid Heartbeat ID string
- *
- * @return The RequestBuilder for continued Chaining
- */
- RequestBuilder setHeartBeatId(String hid);
-
- /**
- * Returns the full url need to execute a request.
- *
- * Example:
- * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
- * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
- * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
- *
- * @return the URL to use when making requests of Sysomos Heartbeat
- */
- URL getRequestUrl();
-
- /**
- * Executes the request to the Sysomos Heartbeat API and returns a valid response
- */
- BeatApi.BeatResponse execute();
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
deleted file mode 100644
index 2fe082a..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.google.common.base.Strings;
-import com.sysomos.json.Sysomos;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.sysomos.data.HeartbeatInfo;
-import org.apache.streams.sysomos.util.SysomosUtils;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Wrapper for the Sysomos API.
- */
-public class SysomosClient {
-
- public static final String BASE_URL_STRING = "http://api.sysomos.com/";
- private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
-
- private String apiKey;
-
- private HttpURLConnection client;
-
- public SysomosClient(String apiKey) {
- this.apiKey = apiKey;
- }
-
- public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception {
- String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey);
- urlString = urlString.replace("{hid}", hid);
- String xmlResponse = SysomosUtils.queryUrl(new URL(urlString));
- return new HeartbeatInfo(xmlResponse);
- }
-
- public RequestBuilder createRequestBuilder() {
- return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
deleted file mode 100644
index e552ecd..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosHeartbeatStream.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.sysomos.xml.BeatApi;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
- */
-public class SysomosHeartbeatStream implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
-
- private final SysomosProvider provider;
- private final SysomosClient client;
- private final String heartbeatId;
- private final long maxApiBatch;
- private final long minLatency;
-
- private String lastID = null;
-
- public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
- this.provider = provider;
- this.heartbeatId = heartbeatId;
-
- this.client = provider.getClient();
- this.maxApiBatch = provider.getMaxApiBatch();
- this.minLatency = provider.getMinLatency();
- }
-
- @Override
- public void run() {
- QueryResult result;
- //Iff we are trying to get to a specific document ID, continue to query after minimum delay
- do {
- LOGGER.debug("Querying API to match last ID of {}", lastID);
- result = executeAPIRequest();
- sleep();
- } while (lastID != null && !result.isMatchedLastId());
- //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
- //found the specific ID
- lastID = result.getCurrentId();
- LOGGER.debug("Completed current execution with a final docID of {}", lastID);
- }
-
- protected void sleep() {
- try {
- Thread.sleep(this.minLatency);
- } catch (InterruptedException e) {
- LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
- }
- }
-
- protected QueryResult executeAPIRequest() {
- BeatApi.BeatResponse response = this.client.createRequestBuilder()
- .setHeartBeatId(heartbeatId)
- .setOffset(0)
- .setReturnSetSize(maxApiBatch).execute();
-
- LOGGER.debug("Received {} results from API query", response.getCount());
-
- String currentId = null;
- boolean matched = false;
- for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
- String docId = beat.getDocid();
- //We get documents in descending time order. This will set the id to the latest document
- if(currentId == null) {
- currentId = docId;
- }
- //We only want to process documents that we know we have not seen before
- if(lastID != null && lastID.equals(docId)) {
- matched = true;
- break;
- }
- StreamsDatum item = new StreamsDatum(beat, docId);
- item.getMetadata().put("heartbeat", this.heartbeatId);
- this.provider.enqueueItem(item);
- }
- return new QueryResult(matched, currentId);
- }
-
- private class QueryResult {
- private boolean matchedLastId;
- private String currentId;
-
- private QueryResult(boolean matchedLastId, String currentId) {
- this.matchedLastId = matchedLastId;
- this.currentId = currentId;
- }
-
- public boolean isMatchedLastId() {
- return matchedLastId;
- }
-
- public void setMatchedLastId(boolean matchedLastId) {
- this.matchedLastId = matchedLastId;
- }
-
- public String getCurrentId() {
- return currentId;
- }
-
- public void setCurrentId(String currentId) {
- this.currentId = currentId;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
deleted file mode 100644
index 9f12cc7..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos;
-
-import com.google.common.collect.Queues;
-import com.sysomos.SysomosConfiguration;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Streams Provider for the Sysomos Heartbeat API
- */
-public class SysomosProvider implements StreamsProvider {
-
- public final static String STREAMS_ID = "SysomosProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
-
- public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds
- public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
- public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
-
- protected volatile Queue<StreamsDatum> providerQueue;
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final long maxQueued;
- private final long minLatency;
- private final long scheduledLatency;
- private final long maxApiBatch;
-
- private SysomosClient client;
- private SysomosConfiguration config;
- private ScheduledExecutorService stream;
- private boolean started = false;
-
- public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
- this.config = sysomosConfiguration;
- this.client = new SysomosClient(sysomosConfiguration.getApiKey());
- this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
- this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
- this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
- this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
- }
-
- public SysomosConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(SysomosConfiguration config) {
- this.config = config;
- }
-
- @Override
- public void startStream() {
- LOGGER.trace("Starting Producer");
- if (!started) {
- LOGGER.trace("Producer not started. Initializing");
- stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
- for (String heartbeatId : getConfig().getHeartbeatIds()) {
- Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
- stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
- LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
- }
- started = true;
- }
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- StreamsResultSet current;
- try {
- lock.writeLock().lock();
- LOGGER.debug("Creating new result set for {} items", providerQueue.size());
- current = new StreamsResultSet(providerQueue);
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- return current;
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger bigInteger) {
- throw new NotImplementedException("readNew not currently implemented");
- }
-
- @Override
- public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
- throw new NotImplementedException("readRange not currently implemented");
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.providerQueue = constructQueue();
- }
-
- @Override
- public void cleanUp() {
- stream.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
- stream.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
- LOGGER.error("Stream did not terminate");
- }
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- stream.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
- public long getMinLatency() {
- return minLatency;
- }
-
- public long getMaxApiBatch() {
- return maxApiBatch;
- }
-
- public SysomosClient getClient() {
- return client;
- }
-
- protected void enqueueItem(StreamsDatum datum) {
- boolean success;
- do {
- try {
- pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
- lock.readLock().lock();
- success = providerQueue.offer(datum);
- Thread.yield();
- }finally {
- lock.readLock().unlock();
- }
- }
- while (!success);
- }
-
- /**
- * Wait for the queue size to be below threshold before allowing execution to continue on this thread
- */
- private void pauseForSpace() {
- while(this.providerQueue.size() >= maxQueued) {
- LOGGER.trace("Sleeping the current thread due to a full queue");
- try {
- Thread.sleep(100);
- LOGGER.trace("Resuming thread after wait period");
- } catch (InterruptedException e) {
- LOGGER.warn("Thread was interrupted", e);
- }
- }
- }
-
- private Queue<StreamsDatum> constructQueue() {
- return Queues.newConcurrentLinkedQueue();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
new file mode 100644
index 0000000..28f33df
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import com.sysomos.xml.ObjectFactory;
+import org.apache.streams.sysomos.SysomosException;
+import org.apache.streams.sysomos.util.SysomosUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.net.URL;
+
+/**
+ * Defines a common pattern for requesting data from the Sysomos API.
+ */
+public abstract class AbstractRequestBuilder implements RequestBuilder {
+ private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
+
+ /**
+ * Executes the request to the Sysomos Heartbeat API and returns a valid response
+ */
+ public BeatApi.BeatResponse execute() {
+ URL url = this.getRequestUrl();
+ try {
+ String xmlResponse = SysomosUtils.queryUrl(url);
+ JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
+ Unmarshaller unmarshaller = context.createUnmarshaller();
+ BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
+ return beatApi.getBeatResponse();
+ } catch (JAXBException e) {
+ LOGGER.error("Unable to unmarshal XML content");
+ throw new SysomosException("Unable to unmarshal XML content", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
new file mode 100644
index 0000000..79c7f12
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.sysomos.provider;
+
+import org.apache.streams.sysomos.SysomosException;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static org.apache.streams.sysomos.util.SysomosUtils.*;
+
+/**
+ * Builds requests for the Sysomos Heartbeat Content API. This is the preferred method of
+ * accessing data from Sysomoos Heartbeat
+ */
+public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
+
+ private String baseUrl;
+ private String hid;
+ private String addedAfter;
+ private String addedBefore;
+ private String size;
+ private String offset;
+ private String apiKey;
+
+ /**
+ * The max number of items you are allowed to get per request.
+ */
+ public static final int MAX_ALLOWED_PER_REQUEST = 10000;
+
+ /**
+ * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL
+ * @param baseUrl the base URL for the sysomos API
+ * @param apiKey the API key generated by Sysomos for authorization
+ */
+ protected ContentRequestBuilder(String baseUrl, String apiKey) {
+ this.baseUrl = baseUrl;
+ this.apiKey = apiKey;
+ }
+
+ /**
+ * Gets the Request URL based on the local fields
+ * @return a valid URL for the Sysomos API or an exception
+ */
+ @Override
+ public URL getRequestUrl() {
+ StringBuilder url = new StringBuilder();
+ url.append(this.baseUrl);
+ url.append("dev/v1/heartbeat/content?");
+ url.append("apiKey=");
+ url.append(this.apiKey);
+ url.append("&hid=");
+ url.append(this.hid);
+ if (size != null) {
+ url.append("&size=");
+ url.append(this.size);
+ }
+ if (this.addedAfter != null) {
+ url.append("&addedAfter=");
+ url.append(this.addedAfter);
+ }
+ if (this.addedBefore != null) {
+ url.append("&addedBefore=");
+ url.append(this.addedBefore);
+ }
+ if (this.offset != null) {
+ url.append("&offset=");
+ url.append(this.offset);
+ }
+ String urlString = url.toString();
+ LOGGER.debug("Constructed Request URL: {}", urlString);
+ try {
+ return new URL(urlString);
+ } catch (MalformedURLException e) {
+ throw new SysomosException("Malformed Request URL. Check Request Builder parameters", e);
+ }
+ }
+
+ @Override
+ public RequestBuilder setHeartBeatId(int hid) {
+ return setHeartBeatId(Integer.toString(hid));
+ }
+
+ @Override
+ public RequestBuilder setHeartBeatId(String hid) {
+ this.hid = hid;
+ return this;
+ }
+
+ @Override
+ public RequestBuilder setAddedAfterDate(DateTime afterDate) {
+ this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
+ return this;
+ }
+
+ @Override
+ public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
+ this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
+ return this;
+ }
+
+ @Override
+ public RequestBuilder setReturnSetSize(long size) {
+ this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
+ return this;
+ }
+
+ @Override
+ public RequestBuilder setOffset(long offset) {
+ this.offset = Long.toString(offset);
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
new file mode 100644
index 0000000..0e12025
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import org.joda.time.DateTime;
+
+import java.net.URL;
+
+/**
+ * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion
+ */
+public interface RequestBuilder {
+ /**
+ * Sets the date after which documents should be returned from Sysomos
+ * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setAddedAfterDate(DateTime afterDate);
+
+ /**
+ * Sets the date before which documents should be returned from Sysomos
+ * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setAddedBeforeDate(DateTime beforeDate);
+
+ /**
+ * Sets the size of the expected response
+ * @param size the number of documents
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setReturnSetSize(long size);
+
+ /**
+ * Sets the starting offset for the number of documents given the other parameters
+ * @param offset the starting offset
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setOffset(long offset);
+
+ /**
+ * Sets the Sysomos Heartbeat ID
+ * @param hid Heartbeat ID
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setHeartBeatId(int hid);
+
+ /**
+ *
+ * Sets the Sysomos Heartbeat ID as a String
+ * @param hid Heartbeat ID string
+ *
+ * @return The RequestBuilder for continued Chaining
+ */
+ RequestBuilder setHeartBeatId(String hid);
+
+ /**
+ * Returns the full url need to execute a request.
+ *
+ * Example:
+ * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
+ * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
+ * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
+ *
+ * @return the URL to use when making requests of Sysomos Heartbeat
+ */
+ URL getRequestUrl();
+
+ /**
+ * Executes the request to the Sysomos Heartbeat API and returns a valid response
+ */
+ BeatApi.BeatResponse execute();
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
new file mode 100644
index 0000000..488b6c7
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import org.apache.streams.sysomos.data.HeartbeatInfo;
+import org.apache.streams.sysomos.util.SysomosUtils;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Wrapper for the Sysomos API.
+ */
+public class SysomosClient {
+
+ public static final String BASE_URL_STRING = "http://api.sysomos.com/";
+ private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
+
+ private String apiKey;
+
+ private HttpURLConnection client;
+
+ public SysomosClient(String apiKey) {
+ this.apiKey = apiKey;
+ }
+
+ public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception {
+ String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey);
+ urlString = urlString.replace("{hid}", hid);
+ String xmlResponse = SysomosUtils.queryUrl(new URL(urlString));
+ return new HeartbeatInfo(xmlResponse);
+ }
+
+ public RequestBuilder createRequestBuilder() {
+ return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
new file mode 100644
index 0000000..c5145fb
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a {@link java.lang.Runnable} query mechanism for grabbing documents from the Sysomos API
+ */
+public class SysomosHeartbeatStream implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
+
+ private final SysomosProvider provider;
+ private final SysomosClient client;
+ private final String heartbeatId;
+ private final long maxApiBatch;
+ private final long minLatency;
+
+ private String lastID = null;
+
+ public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
+ this.provider = provider;
+ this.heartbeatId = heartbeatId;
+
+ this.client = provider.getClient();
+ this.maxApiBatch = provider.getMaxApiBatch();
+ this.minLatency = provider.getMinLatency();
+ }
+
+ @Override
+ public void run() {
+ QueryResult result;
+ //Iff we are trying to get to a specific document ID, continue to query after minimum delay
+ do {
+ LOGGER.debug("Querying API to match last ID of {}", lastID);
+ result = executeAPIRequest();
+ sleep();
+ } while (lastID != null && !result.isMatchedLastId());
+ //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+ //found the specific ID
+ lastID = result.getCurrentId();
+ LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ }
+
+ protected void sleep() {
+ try {
+ Thread.sleep(this.minLatency);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+ }
+ }
+
+ protected QueryResult executeAPIRequest() {
+ BeatApi.BeatResponse response = this.client.createRequestBuilder()
+ .setHeartBeatId(heartbeatId)
+ .setOffset(0)
+ .setReturnSetSize(maxApiBatch).execute();
+
+ LOGGER.debug("Received {} results from API query", response.getCount());
+
+ String currentId = null;
+ boolean matched = false;
+ for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
+ String docId = beat.getDocid();
+ //We get documents in descending time order. This will set the id to the latest document
+ if(currentId == null) {
+ currentId = docId;
+ }
+ //We only want to process documents that we know we have not seen before
+ if(lastID != null && lastID.equals(docId)) {
+ matched = true;
+ break;
+ }
+ StreamsDatum item = new StreamsDatum(beat, docId);
+ item.getMetadata().put("heartbeat", this.heartbeatId);
+ this.provider.enqueueItem(item);
+ }
+ return new QueryResult(matched, currentId);
+ }
+
+ private class QueryResult {
+ private boolean matchedLastId;
+ private String currentId;
+
+ private QueryResult(boolean matchedLastId, String currentId) {
+ this.matchedLastId = matchedLastId;
+ this.currentId = currentId;
+ }
+
+ public boolean isMatchedLastId() {
+ return matchedLastId;
+ }
+
+ public void setMatchedLastId(boolean matchedLastId) {
+ this.matchedLastId = matchedLastId;
+ }
+
+ public String getCurrentId() {
+ return currentId;
+ }
+
+ public void setCurrentId(String currentId) {
+ this.currentId = currentId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/40ab1593/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
new file mode 100644
index 0000000..eaf9eac
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.provider;
+
+import com.google.common.collect.Queues;
+import com.sysomos.SysomosConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Streams Provider for the Sysomos Heartbeat API
+ */
+public class SysomosProvider implements StreamsProvider {
+
+ public final static String STREAMS_ID = "SysomosProvider";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+
+ public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds
+ public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue
+ public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request
+
+ protected volatile Queue<StreamsDatum> providerQueue;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final long maxQueued;
+ private final long minLatency;
+ private final long scheduledLatency;
+ private final long maxApiBatch;
+
+ private SysomosClient client;
+ private SysomosConfiguration config;
+ private ScheduledExecutorService stream;
+ private boolean started = false;
+
+ public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
+ this.config = sysomosConfiguration;
+ this.client = new SysomosClient(sysomosConfiguration.getApiKey());
+ this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
+ this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs();
+ this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
+ this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
+ }
+
+ public SysomosConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(SysomosConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void startStream() {
+ LOGGER.trace("Starting Producer");
+ if (!started) {
+ LOGGER.trace("Producer not started. Initializing");
+ stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
+ for (String heartbeatId : getConfig().getHeartbeatIds()) {
+ Runnable task = new SysomosHeartbeatStream(this, heartbeatId);
+ stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
+ LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
+ }
+ started = true;
+ }
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ StreamsResultSet current;
+ try {
+ lock.writeLock().lock();
+ LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+ current = new StreamsResultSet(providerQueue);
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return current;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ throw new NotImplementedException("readNew not currently implemented");
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+ throw new NotImplementedException("readRange not currently implemented");
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.providerQueue = constructQueue();
+ }
+
+ @Override
+ public void cleanUp() {
+ stream.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+ stream.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ stream.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public long getMinLatency() {
+ return minLatency;
+ }
+
+ public long getMaxApiBatch() {
+ return maxApiBatch;
+ }
+
+ public SysomosClient getClient() {
+ return client;
+ }
+
+ protected void enqueueItem(StreamsDatum datum) {
+ boolean success;
+ do {
+ try {
+ pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method
+ lock.readLock().lock();
+ success = providerQueue.offer(datum);
+ Thread.yield();
+ }finally {
+ lock.readLock().unlock();
+ }
+ }
+ while (!success);
+ }
+
+ /**
+ * Wait for the queue size to be below threshold before allowing execution to continue on this thread
+ */
+ private void pauseForSpace() {
+ while(this.providerQueue.size() >= maxQueued) {
+ LOGGER.trace("Sleeping the current thread due to a full queue");
+ try {
+ Thread.sleep(100);
+ LOGGER.trace("Resuming thread after wait period");
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread was interrupted", e);
+ }
+ }
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.newConcurrentLinkedQueue();
+ }
+}
[2/2] git commit: Added new converter
Posted by mf...@apache.org.
Added new converter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6ffe29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6ffe29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6ffe29e
Branch: refs/heads/master
Commit: e6ffe29e8017592f6df881c8f8efc546041536be
Parents: 40ab159
Author: mfranklin <mf...@apache.org>
Authored: Wed Apr 30 10:56:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Apr 30 10:56:02 2014 -0400
----------------------------------------------------------------------
.../streams/mongo/MongoPersistWriter.java | 3 +-
.../SysomosBeatActivityConverter.java | 137 +++++++++++++++++++
.../sysomos/proessor/SysomosTypeConverter.java | 56 ++++++++
.../apache/streams/data/ActivitySerializer.java | 1 +
4 files changed, 196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index a4a089a..80541e8 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -14,6 +14,7 @@ import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
new file mode 100644
index 0000000..e084946
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.conversion;
+
+import com.google.common.collect.Maps;
+import com.sysomos.xml.BeatApi;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Converts an instance of a {@link com.sysomos.xml.BeatApi.BeatResponse.Beat} to an {@link org.apache.streams.pojo.json.Activity}
+ */
+public class SysomosBeatActivityConverter {
+
+ public static final String LANGUAGE_KEY = "LANGUAGE";
+
+ public Activity convert(BeatApi.BeatResponse.Beat beat) {
+ Activity converted = new Activity();
+ converted.setId(beat.getDocid());
+ converted.setVerb("posted");
+ converted.setContent(beat.getContent());
+ converted.setTitle(beat.getTitle());
+ converted.setPublished(new DateTime(beat.getTime()));
+ converted.setUrl(beat.getLink());
+ converted.setActor(new Actor());
+ Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
+ Map<String, Object> extensions = ensureExtensions(converted);
+ setLocation(beat, extensions);
+ setObject(beat, converted);
+ setProvider(beat, converted);
+ setLanguage(mappedTags, extensions);
+ extensions.put("sysomos", beat);
+
+ setChannelSpecificValues(beat, converted, mappedTags);
+
+ return converted;
+ }
+
+ protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
+ String mediaType = beat.getMediaType();
+ String lowerMediaType = mediaType.toLowerCase();
+ Actor actor = converted.getActor();
+ ActivityObject object = converted.getObject();
+ if ("TWITTER".equals(mediaType)) {
+ actor.setId(getPersonId(lowerMediaType, beat.getHost()));
+ actor.setDisplayName(beat.getHost());
+ actor.setUrl("http://twitter.com/" + beat.getHost());
+ object.setObjectType("tweet");
+ object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
+ } else if ("FACEBOOK".equals(mediaType)) {
+ actor.setId(getPersonId(lowerMediaType, mappedTags.get("FBID").getValue()));
+ actor.setDisplayName(beat.getTitle());
+ actor.setUrl(beat.getHost());
+ object.setObjectType("post");
+ object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+ } else {
+ actor.setId(null);
+ actor.setDisplayName(null);
+ actor.setUrl(null);
+ object.setObjectType("post");
+ object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+ }
+ }
+
+ protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
+ if(mappedTags.containsKey(LANGUAGE_KEY)) {
+ extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
+ }
+ }
+
+ protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
+ ActivityObject object = new ActivityObject();
+ converted.setObject(object);
+ object.setUrl(beat.getLink());
+ object.setContent(beat.getContent());
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
+ Map<String, Object> location;
+ String country = beat.getLocation().getCountry();
+ if(StringUtils.isNotBlank(country)) {
+ if (extensions.containsKey(LOCATION_EXTENSION)) {
+ location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
+ } else {
+ location = Maps.newHashMap();
+ extensions.put(LOCATION_EXTENSION, location);
+ }
+ location.put(LOCATION_EXTENSION_COUNTRY, country);
+ }
+ }
+
+ protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
+ Provider provider = new Provider();
+ String mediaType = beat.getMediaType().toLowerCase();
+ provider.setId(getProviderId(mediaType));
+ provider.setDisplayName(StringUtils.capitalize(mediaType));
+ converted.setProvider(provider);
+ }
+
+ protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
+ Map<String, BeatApi.BeatResponse.Beat.Tag> tags = Maps.newHashMap();
+ for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
+ if(tag.getSystemType() != null) {
+ tags.put(tag.getSystemType(), tag);
+ }
+ }
+ return tags;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
new file mode 100644
index 0000000..187d402
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+ private SysomosBeatActivityConverter converter;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+ entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+ return Lists.newArrayList(entry);
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ converter = new SysomosBeatActivityConverter();
+ }
+
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
index ad3809f..23903e5 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
@@ -23,6 +23,7 @@ import org.apache.streams.pojo.json.Activity;
import java.util.List;
+//TODO: Change the name of this class to ActivityConverter STREAMS-68
/**
* Serializes and deserializes Activities
*/