You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:02 UTC
[01/10] git commit: STREAMS-71 | Added date parameters
Repository: incubator-streams
Updated Branches:
refs/heads/master 7d6194d36 -> bce1657d4
STREAMS-71 | Added date parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4202050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4202050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4202050
Branch: refs/heads/master
Commit: d4202050f8f09e33dd892ceb742b4a5b4b029e54
Parents: ec2ae35
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:01:26 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:01:26 2014 -0400
----------------------------------------------------------------------
.../sysomos/provider/SysomosProvider.java | 39 ++++++++++++++++++--
1 file changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4202050/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 0073ac2..b8b5e56 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.data.util.RFC3339Utils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,10 +52,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class SysomosProvider implements StreamsProvider {
+
public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
+ public static final String ENDING_TIME_KEY = "addedBefore";
+ public static final String STARTING_TIME_KEY = "addedAfter";
public static final String STREAMS_ID = "SysomosProvider";
public static final String MODE_KEY = "mode";
public static final String STARTING_DOCS_KEY = "startingDocs";
@@ -75,6 +79,8 @@ public class SysomosProvider implements StreamsProvider {
private SysomosConfiguration config;
private ScheduledExecutorService stream;
private Map<String, String> documentIds;
+ private Map<String, String> addedBefore;
+ private Map<String, String> addedAfter;
private Mode mode = Mode.CONTINUOUS;
private boolean started = false;
@@ -118,9 +124,7 @@ public class SysomosProvider implements StreamsProvider {
LOGGER.trace("Producer not started. Initializing");
stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
for (String heartbeatId : getConfig().getHeartbeatIds()) {
- Runnable task = documentIds != null && documentIds.containsKey(heartbeatId) ?
- new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)) :
- new SysomosHeartbeatStream(this, heartbeatId);
+ Runnable task = createStream(heartbeatId);
stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
}
@@ -211,10 +215,23 @@ public class SysomosProvider implements StreamsProvider {
while (!success);
}
+ protected SysomosHeartbeatStream createStream(String heartbeatId) {
+ String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+ String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
+
+ if(documentIds != null && documentIds.containsKey(heartbeatId)) {
+ return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
+ }
+ if(afterTime != null || beforeTime != null) {
+ return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+ }
+ return new SysomosHeartbeatStream(this, heartbeatId);
+ }
+
/**
* Wait for the queue size to be below threshold before allowing execution to continue on this thread
*/
- private void pauseForSpace() {
+ protected void pauseForSpace() {
while(this.providerQueue.size() >= maxQueued) {
LOGGER.trace("Sleeping the current thread due to a full queue");
try {
@@ -242,6 +259,20 @@ public class SysomosProvider implements StreamsProvider {
}
this.documentIds = (Map)configIds;
}
+ if(configMap.containsKey(STARTING_TIME_KEY)) {
+ Object configIds = configMap.get(STARTING_TIME_KEY);
+ if(!(configIds instanceof Map)) {
+ throw new IllegalStateException("Invalid configuration. Added after key must be an instance of Map<String,String> but was " + configIds);
+ }
+ this.addedAfter = (Map)configIds;
+ }
+ if(configMap.containsKey(ENDING_TIME_KEY)) {
+ Object configIds = configMap.get(ENDING_TIME_KEY);
+ if(!(configIds instanceof Map)) {
+ throw new IllegalStateException("Invalid configuration. Added before key must be an instance of Map<String,String> but was " + configIds);
+ }
+ this.addedBefore = (Map)configIds;
+ }
}
private Queue<StreamsDatum> constructQueue() {
[06/10] git commit: Merge branch 'master' into STREAMS-71
Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78066e6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78066e6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78066e6e
Branch: refs/heads/master
Commit: 78066e6e458b9977e93a7364531470282fbdcf1c
Parents: 899b2e2 16343c6
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 08:15:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 08:15:29 2014 -0400
----------------------------------------------------------------------
.../streams-provider-datasift/pom.xml | 18 ++-
.../streams/datasift/csdl/DatasiftCsdlUtil.java | 114 +++++++++++++++++++
.../streams/jackson/StreamsJacksonMapper.java | 4 +-
3 files changed, 133 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[07/10] git commit: Merge branch 'master' into STREAMS-71
Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/14a95328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/14a95328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/14a95328
Branch: refs/heads/master
Commit: 14a953283edbc78f666bbf1c326ee349b86a92db
Parents: 78066e6 7d6194d
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 09:21:12 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 09:21:12 2014 -0400
----------------------------------------------------------------------
streams-contrib/streams-provider-datasift/pom.xml | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
[04/10] git commit: STREAMS-71 | Updated to handle multiple modes of
backfill
Posted by mf...@apache.org.
STREAMS-71 | Updated to handle multiple modes of backfill
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b9993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b9993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b9993
Branch: refs/heads/master
Commit: 409b9993e247ce685e9718a246d65c51f64117d5
Parents: 23fc011
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 14:22:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 14:22:18 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 45 +++++++++++++++-----
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b9993/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c234cb1..c0bda15 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -44,6 +44,7 @@ public class SysomosHeartbeatStream implements Runnable {
private String lastID;
private DateTime beforeTime;
private DateTime afterTime;
+ private DateTime lastRunTime;
private int offsetCount = 0;
private boolean enabled = true;
@@ -76,6 +77,7 @@ public class SysomosHeartbeatStream implements Runnable {
public void run() {
QueryResult result;
String mostCurrentId = null;
+ lastRunTime = DateTime.now();
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
LOGGER.debug("Querying API to match last ID of {}", lastID);
@@ -85,17 +87,22 @@ public class SysomosHeartbeatStream implements Runnable {
if(offsetCount == 1) {
mostCurrentId = result.getCurrentId();
}
+ updateOffset(result);
sleep();
- } while (shouldBackfill(result));
- updateState(result, mostCurrentId);
+ } while (offsetCount > 0);
+ updateState(result, mostCurrentId);
LOGGER.debug("Completed current execution with a final docID of {}", lastID);
}
protected void updateState(QueryResult result, String mostCurrentId) {
- //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
- //found the specific ID
- lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+ if(OperatingMode.DOC_MATCH.equals(mode)) {
+ //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+ //found the specific ID
+ lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+ } else {
+ afterTime = lastRunTime;
+ }
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
shutdown();
@@ -103,8 +110,13 @@ public class SysomosHeartbeatStream implements Runnable {
}
}
- protected boolean shouldBackfill(QueryResult result) {
- return lastID != null && !result.isMatchedLastId();
+ protected void updateOffset(QueryResult result) {
+ if(OperatingMode.DOC_MATCH.equals(mode)) {
+ //Reset the offset iff we have found a match or this is the first execution
+ offsetCount = lastID == null || result.isMatchedLastId() ? 0 : offsetCount + 1;
+ } else {
+ offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
+ }
}
protected void sleep() {
@@ -121,6 +133,7 @@ public class SysomosHeartbeatStream implements Runnable {
String currentId = null;
boolean matched = false;
+ int responseSize = 0;
if(response != null) {
for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
String docId = beat.getDocid();
@@ -137,10 +150,9 @@ public class SysomosHeartbeatStream implements Runnable {
item.getMetadata().put("heartbeat", this.heartbeatId);
this.provider.enqueueItem(item);
}
- //Reset the offset iff we have found a match or this is the first execution
- offsetCount = lastID == null || matched ? 0 : offsetCount + 1;
+ responseSize = response.getCount();
}
- return new QueryResult(matched, currentId);
+ return new QueryResult(matched, currentId, responseSize);
}
protected BeatApi.BeatResponse executeAPIRequest() {
@@ -175,10 +187,13 @@ public class SysomosHeartbeatStream implements Runnable {
protected class QueryResult {
private boolean matchedLastId;
private String currentId;
+ private int responseSize;
- private QueryResult(boolean matchedLastId, String currentId) {
+
+ public QueryResult(boolean matchedLastId, String currentId, int responseSize) {
this.matchedLastId = matchedLastId;
this.currentId = currentId;
+ this.responseSize = responseSize;
}
public boolean isMatchedLastId() {
@@ -196,5 +211,13 @@ public class SysomosHeartbeatStream implements Runnable {
public void setCurrentId(String currentId) {
this.currentId = currentId;
}
+
+ public int getResponseSize() {
+ return responseSize;
+ }
+
+ public void setResponseSize(int responseSize) {
+ this.responseSize = responseSize;
+ }
}
}
[08/10] git commit: Fixed error where test sources were set to java17
by pom instead of profile
Posted by mf...@apache.org.
Fixed error where test sources were set to java17 by pom instead of profile
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2899af4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2899af4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2899af4f
Branch: refs/heads/master
Commit: 2899af4f0e22818dcd8cf60ad02ecec8d6078cd4
Parents: 14a9532
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 10:12:16 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 10:12:16 2014 -0400
----------------------------------------------------------------------
streams-contrib/streams-provider-datasift/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2899af4f/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 3b9f96f..5c5f674 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -83,7 +83,7 @@
<build>
<sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java17</testSourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
[09/10] git commit: STREAMS-71 | Updated logging
Posted by mf...@apache.org.
STREAMS-71 | Updated logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4ca2b6cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4ca2b6cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4ca2b6cd
Branch: refs/heads/master
Commit: 4ca2b6cddbd2a79c9cba04a049ca4d30126d7185
Parents: 2899af4
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 11:32:16 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 11:32:16 2014 -0400
----------------------------------------------------------------------
.../sysomos/proessor/SysomosTypeConverter.java | 56 --------------------
.../sysomos/processor/SysomosTypeConverter.java | 56 ++++++++++++++++++++
.../provider/SysomosHeartbeatStream.java | 6 +--
.../sysomos/provider/SysomosProvider.java | 12 +++--
4 files changed, 67 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
deleted file mode 100644
index 187d402..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
-
-import com.google.common.collect.Lists;
-import com.sysomos.xml.BeatApi;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
-
-import java.util.List;
-
-/**
- * Stream processor that converts Sysomos type to Activity
- */
-public class SysomosTypeConverter implements StreamsProcessor {
-
- private SysomosBeatActivityConverter converter;
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
- entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
- return Lists.newArrayList(entry);
- } else {
- return Lists.newArrayList();
- }
- }
-
- @Override
- public void prepare(Object configurationObject) {
- converter = new SysomosBeatActivityConverter();
- }
-
- @Override
- public void cleanUp() {
- //NOP
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
new file mode 100644
index 0000000..db9f416
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.processor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+ private SysomosBeatActivityConverter converter;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+ entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+ return Lists.newArrayList(entry);
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ converter = new SysomosBeatActivityConverter();
+ }
+
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c0bda15..5cc993e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -80,7 +80,7 @@ public class SysomosHeartbeatStream implements Runnable {
lastRunTime = DateTime.now();
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
- LOGGER.debug("Querying API to match last ID of {}", lastID);
+ LOGGER.debug("Querying API to match last ID of {} or time range of {} - {}", lastID, afterTime, beforeTime);
result = queryAPI();
//Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
//Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
@@ -92,7 +92,7 @@ public class SysomosHeartbeatStream implements Runnable {
} while (offsetCount > 0);
updateState(result, mostCurrentId);
- LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ LOGGER.debug("Completed current execution with a final docID of {} or time of {}", lastID, afterTime);
}
protected void updateState(QueryResult result, String mostCurrentId) {
@@ -106,7 +106,7 @@ public class SysomosHeartbeatStream implements Runnable {
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
shutdown();
- LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
+ LOGGER.info("Completed backfill to {} for heartbeat {}", OperatingMode.DOC_MATCH.equals(mode) ? lastID : afterTime, heartbeatId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 128bc43..1b8f164 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -221,14 +221,18 @@ public class SysomosProvider implements StreamsProvider {
}
protected SysomosHeartbeatStream createStream(String heartbeatId) {
- String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
- String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
+ String afterTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+ String beforeTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
if(documentIds != null && documentIds.containsKey(heartbeatId)) {
return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
}
- if(afterTime != null || beforeTime != null) {
- return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+ if(afterTime != null) {
+ if(beforeTime != null) {
+ return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+ } else {
+ return new SysomosHeartbeatStream(this, heartbeatId, null, RFC3339Utils.parseToUTC(afterTime));
+ }
}
return new SysomosHeartbeatStream(this, heartbeatId);
}
[10/10] git commit: STREAMS-71 | Updated time format
Posted by mf...@apache.org.
STREAMS-71 | Updated time format
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bce1657d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bce1657d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bce1657d
Branch: refs/heads/master
Commit: bce1657d413606503ede59437c107e63de38f3d6
Parents: 4ca2b6c
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 12:13:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 12:13:03 2014 -0400
----------------------------------------------------------------------
.../main/java/org/apache/streams/sysomos/util/SysomosUtils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bce1657d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
index 338ce8f..3b6a843 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
@@ -41,7 +41,7 @@ import java.util.regex.Pattern;
public class SysomosUtils {
public static final Pattern CODE_PATTERN = Pattern.compile("code: ([0-9]+)");
- public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'hh:mm:ssZ");
+ public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC();
private final static Logger LOGGER = LoggerFactory.getLogger(SysomosUtils.class);
private SysomosUtils() {}
[02/10] git commit: STREAMS-71 | Updated config to use date parameters
Posted by mf...@apache.org.
STREAMS-71 | Updated config to use date parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7afc27cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7afc27cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7afc27cf
Branch: refs/heads/master
Commit: 7afc27cf9d42b36aec343ac0f8be2626ed269671
Parents: d420205
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:01:45 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:01:45 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 34 +++++++++++++++++---
1 file changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afc27cf/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 2a40450..f6f4f29 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -21,6 +21,7 @@ package org.apache.streams.sysomos.provider;
import com.sysomos.xml.BeatApi;
import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +30,8 @@ import org.slf4j.LoggerFactory;
*/
public class SysomosHeartbeatStream implements Runnable {
+ private static enum OperatingMode { DATE, DOC_MATCH}
+
private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
private final SysomosProvider provider;
@@ -36,23 +39,37 @@ public class SysomosHeartbeatStream implements Runnable {
private final String heartbeatId;
private final long maxApiBatch;
private final long minLatency;
+ private final OperatingMode mode;
private String lastID;
+ private DateTime beforeTime;
+ private DateTime afterTime;
private int offsetCount = 0;
private boolean enabled = true;
public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
- this(provider, heartbeatId, null);
+ this(provider, heartbeatId, OperatingMode.DATE);
+ }
+
+ public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) {
+ this(provider, heartbeatId, OperatingMode.DATE);
+ this.beforeTime = beforeTime;
+ this.afterTime = afterTime;
}
- public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String docId) {
+ public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String documentId) {
+ this(provider, heartbeatId, OperatingMode.DOC_MATCH);
+ this.lastID = documentId;
+ }
+
+ public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, OperatingMode mode) {
this.provider = provider;
this.heartbeatId = heartbeatId;
- this.lastID = docId;
this.client = provider.getClient();
this.maxApiBatch = provider.getMaxApiBatch();
this.minLatency = provider.getMinLatency();
+ this.mode = mode;
}
@Override
@@ -94,10 +111,17 @@ public class SysomosHeartbeatStream implements Runnable {
BeatApi.BeatResponse response = null;
try {
if(enabled) {
- response = this.client.createRequestBuilder()
+ RequestBuilder requestBuilder = this.client.createRequestBuilder()
.setHeartBeatId(heartbeatId)
.setOffset(offsetCount * maxApiBatch)
- .setReturnSetSize(maxApiBatch).execute();
+ .setReturnSetSize(maxApiBatch);
+ if(beforeTime != null) {
+ requestBuilder.setAddedBeforeDate(beforeTime);
+ }
+ if(afterTime != null) {
+ requestBuilder.setAddedAfterDate(afterTime);
+ }
+ response = requestBuilder.execute();
LOGGER.debug("Received {} results from API query", response.getCount());
}
[03/10] git commit: STREAMS-71 | Updated organization for easier
update of operation
Posted by mf...@apache.org.
STREAMS-71 | Updated organization for easier update of operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/23fc0119
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/23fc0119
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/23fc0119
Branch: refs/heads/master
Commit: 23fc01199933a97be8414826f91eca12174b3789
Parents: 7afc27c
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:08:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:08:31 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 73 ++++++++++++--------
1 file changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/23fc0119/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index f6f4f29..c234cb1 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,24 +79,32 @@ public class SysomosHeartbeatStream implements Runnable {
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
LOGGER.debug("Querying API to match last ID of {}", lastID);
- result = executeAPIRequest();
+ result = queryAPI();
//Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
//Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
if(offsetCount == 1) {
mostCurrentId = result.getCurrentId();
}
sleep();
- } while (lastID != null && !result.isMatchedLastId());
+ } while (shouldBackfill(result));
+ updateState(result, mostCurrentId);
+
+ LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ }
+
+ protected void updateState(QueryResult result, String mostCurrentId) {
//Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
//found the specific ID
lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
- provider.signalComplete(heartbeatId);
- enabled = false;
+ shutdown();
LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
}
- LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ }
+
+ protected boolean shouldBackfill(QueryResult result) {
+ return lastID != null && !result.isMatchedLastId();
}
protected void sleep() {
@@ -104,30 +112,12 @@ public class SysomosHeartbeatStream implements Runnable {
Thread.sleep(this.minLatency);
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+ shutdown();
}
}
- protected QueryResult executeAPIRequest() {
- BeatApi.BeatResponse response = null;
- try {
- if(enabled) {
- RequestBuilder requestBuilder = this.client.createRequestBuilder()
- .setHeartBeatId(heartbeatId)
- .setOffset(offsetCount * maxApiBatch)
- .setReturnSetSize(maxApiBatch);
- if(beforeTime != null) {
- requestBuilder.setAddedBeforeDate(beforeTime);
- }
- if(afterTime != null) {
- requestBuilder.setAddedAfterDate(afterTime);
- }
- response = requestBuilder.execute();
-
- LOGGER.debug("Received {} results from API query", response.getCount());
- }
- } catch (Exception e) {
- LOGGER.warn("Error querying Sysomos API", e);
- }
+ protected QueryResult queryAPI() {
+ BeatApi.BeatResponse response = executeAPIRequest();
String currentId = null;
boolean matched = false;
@@ -153,7 +143,36 @@ public class SysomosHeartbeatStream implements Runnable {
return new QueryResult(matched, currentId);
}
- private class QueryResult {
+ protected BeatApi.BeatResponse executeAPIRequest() {
+ BeatApi.BeatResponse response = null;
+ try {
+ if(enabled) {
+ RequestBuilder requestBuilder = this.client.createRequestBuilder()
+ .setHeartBeatId(heartbeatId)
+ .setOffset(offsetCount * maxApiBatch)
+ .setReturnSetSize(maxApiBatch);
+ if(beforeTime != null) {
+ requestBuilder.setAddedBeforeDate(beforeTime);
+ }
+ if(afterTime != null) {
+ requestBuilder.setAddedAfterDate(afterTime);
+ }
+ response = requestBuilder.execute();
+
+ LOGGER.debug("Received {} results from API query", response.getCount());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error querying Sysomos API", e);
+ }
+ return response;
+ }
+
+ protected void shutdown() {
+ provider.signalComplete(heartbeatId);
+ enabled = false;
+ }
+
+ protected class QueryResult {
private boolean matchedLastId;
private String currentId;
[05/10] git commit: Merge branch 'master' into STREAMS-71
Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/899b2e25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/899b2e25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/899b2e25
Branch: refs/heads/master
Commit: 899b2e25a130273f5a054dabd92f7e350c31e6a4
Parents: 409b999 e73d2ad
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 20 15:24:42 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 20 15:24:42 2014 -0400
----------------------------------------------------------------------
.../org/apache/streams/s3/S3PersistReader.java | 9 ++++-
.../streams/console/ConsolePersistReader.java | 5 +++
.../ElasticsearchPersistReader.java | 8 ++++-
.../streams/hdfs/WebHdfsPersistReader.java | 9 ++++-
.../streams/kafka/KafkaPersistReader.java | 5 +++
.../provider/DatasiftStreamProvider.java | 5 +++
.../google/gmail/provider/GMailProvider.java | 8 ++++-
.../google/gplus/provider/GPlusProvider.java | 5 +++
.../streams/data/moreover/MoreoverProvider.java | 5 +++
.../streams/rss/provider/RssStreamProvider.java | 6 ++++
.../sysomos/provider/SysomosProvider.java | 7 +++-
.../processor/TwitterProfileProcessor.java | 13 +++----
.../twitter/provider/TwitterStreamProvider.java | 5 +++
.../provider/TwitterTimelineProvider.java | 12 +++++--
.../TwitterUserInformationProvider.java | 11 +++++-
.../apache/streams/core/StreamsProvider.java | 38 +++++++++++++++++---
.../test/providers/EmptyResultSetProvider.java | 5 +++
.../test/providers/NumericMessageProvider.java | 5 +++
.../test/component/FileReaderProvider.java | 5 +++
19 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/899b2e25/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------