You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:02 UTC

[01/10] git commit: STREAMS-71 | Added date parameters

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7d6194d36 -> bce1657d4


STREAMS-71 | Added date parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4202050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4202050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4202050

Branch: refs/heads/master
Commit: d4202050f8f09e33dd892ceb742b4a5b4b029e54
Parents: ec2ae35
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:01:26 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:01:26 2014 -0400

----------------------------------------------------------------------
 .../sysomos/provider/SysomosProvider.java       | 39 ++++++++++++++++++--
 1 file changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4202050/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 0073ac2..b8b5e56 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,10 +52,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class SysomosProvider implements StreamsProvider {
 
+
     public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
 
+    public static final String ENDING_TIME_KEY = "addedBefore";
+    public static final String STARTING_TIME_KEY = "addedAfter";
     public static final String STREAMS_ID = "SysomosProvider";
     public static final String MODE_KEY = "mode";
     public static final String STARTING_DOCS_KEY = "startingDocs";
@@ -75,6 +79,8 @@ public class SysomosProvider implements StreamsProvider {
     private SysomosConfiguration config;
     private ScheduledExecutorService stream;
     private Map<String, String> documentIds;
+    private Map<String, String> addedBefore;
+    private Map<String, String> addedAfter;
     private Mode mode = Mode.CONTINUOUS;
     private boolean started = false;
 
@@ -118,9 +124,7 @@ public class SysomosProvider implements StreamsProvider {
             LOGGER.trace("Producer not started.  Initializing");
             stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
             for (String heartbeatId : getConfig().getHeartbeatIds()) {
-                Runnable task = documentIds != null && documentIds.containsKey(heartbeatId) ?
-                        new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)) :
-                        new SysomosHeartbeatStream(this, heartbeatId);
+                Runnable task = createStream(heartbeatId);
                 stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS);
                 LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
             }
@@ -211,10 +215,23 @@ public class SysomosProvider implements StreamsProvider {
         while (!success);
     }
 
+    protected SysomosHeartbeatStream createStream(String heartbeatId) {
+        String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+        String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
+
+        if(documentIds != null && documentIds.containsKey(heartbeatId)) {
+            return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
+        }
+        if(afterTime != null || beforeTime != null) {
+            return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+        }
+        return new SysomosHeartbeatStream(this, heartbeatId);
+    }
+
     /**
      * Wait for the queue size to be below threshold before allowing execution to continue on this thread
      */
-    private void pauseForSpace() {
+    protected void pauseForSpace() {
         while(this.providerQueue.size() >= maxQueued) {
             LOGGER.trace("Sleeping the current thread due to a full queue");
             try {
@@ -242,6 +259,20 @@ public class SysomosProvider implements StreamsProvider {
             }
             this.documentIds = (Map)configIds;
         }
+        if(configMap.containsKey(STARTING_TIME_KEY)) {
+            Object configIds = configMap.get(STARTING_TIME_KEY);
+            if(!(configIds instanceof Map)) {
+                throw new IllegalStateException("Invalid configuration.  Added after key must be an instance of Map<String,String> but was " + configIds);
+            }
+            this.addedAfter = (Map)configIds;
+        }
+        if(configMap.containsKey(ENDING_TIME_KEY)) {
+            Object configIds = configMap.get(ENDING_TIME_KEY);
+            if(!(configIds instanceof Map)) {
+                throw new IllegalStateException("Invalid configuration.  Added before key must be an instance of Map<String,String> but was " + configIds);
+            }
+            this.addedBefore = (Map)configIds;
+        }
     }
 
     private Queue<StreamsDatum> constructQueue() {


[06/10] git commit: Merge branch 'master' into STREAMS-71

Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78066e6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78066e6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78066e6e

Branch: refs/heads/master
Commit: 78066e6e458b9977e93a7364531470282fbdcf1c
Parents: 899b2e2 16343c6
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 08:15:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 08:15:29 2014 -0400

----------------------------------------------------------------------
 .../streams-provider-datasift/pom.xml           |  18 ++-
 .../streams/datasift/csdl/DatasiftCsdlUtil.java | 114 +++++++++++++++++++
 .../streams/jackson/StreamsJacksonMapper.java   |   4 +-
 3 files changed, 133 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[07/10] git commit: Merge branch 'master' into STREAMS-71

Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/14a95328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/14a95328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/14a95328

Branch: refs/heads/master
Commit: 14a953283edbc78f666bbf1c326ee349b86a92db
Parents: 78066e6 7d6194d
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 09:21:12 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 09:21:12 2014 -0400

----------------------------------------------------------------------
 streams-contrib/streams-provider-datasift/pom.xml | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------



[04/10] git commit: STREAMS-71 | Updated to handle multiple modes of backfill

Posted by mf...@apache.org.
STREAMS-71 | Updated to handle multiple modes of backfill


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/409b9993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/409b9993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/409b9993

Branch: refs/heads/master
Commit: 409b9993e247ce685e9718a246d65c51f64117d5
Parents: 23fc011
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 14:22:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 14:22:18 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/409b9993/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c234cb1..c0bda15 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -44,6 +44,7 @@ public class SysomosHeartbeatStream implements Runnable {
     private String lastID;
     private DateTime beforeTime;
     private DateTime afterTime;
+    private DateTime lastRunTime;
     private int offsetCount = 0;
     private boolean enabled = true;
 
@@ -76,6 +77,7 @@ public class SysomosHeartbeatStream implements Runnable {
     public void run() {
         QueryResult result;
         String mostCurrentId = null;
+        lastRunTime = DateTime.now();
         //Iff we are trying to get to a specific document ID, continue to query after minimum delay
         do {
             LOGGER.debug("Querying API to match last ID of {}", lastID);
@@ -85,17 +87,22 @@ public class SysomosHeartbeatStream implements Runnable {
             if(offsetCount == 1) {
                 mostCurrentId = result.getCurrentId();
             }
+            updateOffset(result);
             sleep();
-        } while (shouldBackfill(result));
-        updateState(result, mostCurrentId);
+        } while (offsetCount > 0);
 
+        updateState(result, mostCurrentId);
         LOGGER.debug("Completed current execution with a final docID of {}", lastID);
     }
 
     protected void updateState(QueryResult result, String mostCurrentId) {
-        //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
-        //found the specific ID
-        lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+        if(OperatingMode.DOC_MATCH.equals(mode)) {
+            //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
+            //found the specific ID
+            lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+        } else {
+            afterTime = lastRunTime;
+        }
 
         if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
             shutdown();
@@ -103,8 +110,13 @@ public class SysomosHeartbeatStream implements Runnable {
         }
     }
 
-    protected boolean shouldBackfill(QueryResult result) {
-        return lastID != null && !result.isMatchedLastId();
+    protected void updateOffset(QueryResult result) {
+        if(OperatingMode.DOC_MATCH.equals(mode)) {
+            //Reset the offset iff we have found a match or this is the first execution
+            offsetCount = lastID == null || result.isMatchedLastId() ? 0 : offsetCount + 1;
+        } else {
+            offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
+        }
     }
 
     protected void sleep() {
@@ -121,6 +133,7 @@ public class SysomosHeartbeatStream implements Runnable {
 
         String currentId = null;
         boolean matched = false;
+        int responseSize = 0;
         if(response != null) {
             for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
                 String docId = beat.getDocid();
@@ -137,10 +150,9 @@ public class SysomosHeartbeatStream implements Runnable {
                 item.getMetadata().put("heartbeat", this.heartbeatId);
                 this.provider.enqueueItem(item);
             }
-            //Reset the offset iff we have found a match or this is the first execution
-            offsetCount = lastID == null || matched ? 0 : offsetCount + 1;
+            responseSize = response.getCount();
         }
-        return new QueryResult(matched, currentId);
+        return new QueryResult(matched, currentId, responseSize);
     }
 
     protected BeatApi.BeatResponse executeAPIRequest() {
@@ -175,10 +187,13 @@ public class SysomosHeartbeatStream implements Runnable {
     protected class QueryResult {
         private boolean matchedLastId;
         private String currentId;
+        private int responseSize;
 
-        private QueryResult(boolean matchedLastId, String currentId) {
+
+        public QueryResult(boolean matchedLastId, String currentId, int responseSize) {
             this.matchedLastId = matchedLastId;
             this.currentId = currentId;
+            this.responseSize = responseSize;
         }
 
         public boolean isMatchedLastId() {
@@ -196,5 +211,13 @@ public class SysomosHeartbeatStream implements Runnable {
         public void setCurrentId(String currentId) {
             this.currentId = currentId;
         }
+
+        public int getResponseSize() {
+            return responseSize;
+        }
+
+        public void setResponseSize(int responseSize) {
+            this.responseSize = responseSize;
+        }
     }
 }


[08/10] git commit: Fixed error where test sources were set to java17 by pom instead of profile

Posted by mf...@apache.org.
Fixed error where test sources were set to java17 by pom instead of profile


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2899af4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2899af4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2899af4f

Branch: refs/heads/master
Commit: 2899af4f0e22818dcd8cf60ad02ecec8d6078cd4
Parents: 14a9532
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 10:12:16 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 10:12:16 2014 -0400

----------------------------------------------------------------------
 streams-contrib/streams-provider-datasift/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2899af4f/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 3b9f96f..5c5f674 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -83,7 +83,7 @@
 
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java17</testSourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>


[09/10] git commit: STREAMS-71 | Updated logging

Posted by mf...@apache.org.
STREAMS-71 | Updated logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4ca2b6cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4ca2b6cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4ca2b6cd

Branch: refs/heads/master
Commit: 4ca2b6cddbd2a79c9cba04a049ca4d30126d7185
Parents: 2899af4
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 11:32:16 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 11:32:16 2014 -0400

----------------------------------------------------------------------
 .../sysomos/proessor/SysomosTypeConverter.java  | 56 --------------------
 .../sysomos/processor/SysomosTypeConverter.java | 56 ++++++++++++++++++++
 .../provider/SysomosHeartbeatStream.java        |  6 +--
 .../sysomos/provider/SysomosProvider.java       | 12 +++--
 4 files changed, 67 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
deleted file mode 100644
index 187d402..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
-
-import com.google.common.collect.Lists;
-import com.sysomos.xml.BeatApi;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
-
-import java.util.List;
-
-/**
- * Stream processor that converts Sysomos type to Activity
- */
-public class SysomosTypeConverter implements StreamsProcessor {
-
-    private SysomosBeatActivityConverter converter;
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
-            entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
-            return Lists.newArrayList(entry);
-        } else {
-            return Lists.newArrayList();
-        }
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        converter = new SysomosBeatActivityConverter();
-    }
-
-    @Override
-    public void cleanUp() {
-        //NOP
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
new file mode 100644
index 0000000..db9f416
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.processor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+    private SysomosBeatActivityConverter converter;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+            entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+            return Lists.newArrayList(entry);
+        } else {
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        converter = new SysomosBeatActivityConverter();
+    }
+
+    @Override
+    public void cleanUp() {
+        //NOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c0bda15..5cc993e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -80,7 +80,7 @@ public class SysomosHeartbeatStream implements Runnable {
         lastRunTime = DateTime.now();
         //Iff we are trying to get to a specific document ID, continue to query after minimum delay
         do {
-            LOGGER.debug("Querying API to match last ID of {}", lastID);
+            LOGGER.debug("Querying API to match last ID of {} or time range of {} - {}", lastID, afterTime, beforeTime);
             result = queryAPI();
             //Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
             //Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
@@ -92,7 +92,7 @@ public class SysomosHeartbeatStream implements Runnable {
         } while (offsetCount > 0);
 
         updateState(result, mostCurrentId);
-        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+        LOGGER.debug("Completed current execution with a final docID of {} or time of {}", lastID, afterTime);
     }
 
     protected void updateState(QueryResult result, String mostCurrentId) {
@@ -106,7 +106,7 @@ public class SysomosHeartbeatStream implements Runnable {
 
         if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
             shutdown();
-            LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
+            LOGGER.info("Completed backfill to {} for heartbeat {}", OperatingMode.DOC_MATCH.equals(mode) ? lastID : afterTime, heartbeatId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 128bc43..1b8f164 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -221,14 +221,18 @@ public class SysomosProvider implements StreamsProvider {
     }
 
     protected SysomosHeartbeatStream createStream(String heartbeatId) {
-        String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
-        String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
+        String afterTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+        String beforeTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
 
         if(documentIds != null && documentIds.containsKey(heartbeatId)) {
             return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
         }
-        if(afterTime != null || beforeTime != null) {
-            return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+        if(afterTime != null) {
+            if(beforeTime != null) {
+                return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+            } else {
+                return new SysomosHeartbeatStream(this, heartbeatId, null, RFC3339Utils.parseToUTC(afterTime));
+            }
         }
         return new SysomosHeartbeatStream(this, heartbeatId);
     }


[10/10] git commit: STREAMS-71 | Updated time format

Posted by mf...@apache.org.
STREAMS-71 | Updated time format


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bce1657d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bce1657d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bce1657d

Branch: refs/heads/master
Commit: bce1657d413606503ede59437c107e63de38f3d6
Parents: 4ca2b6c
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 12:13:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 12:13:03 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/streams/sysomos/util/SysomosUtils.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bce1657d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
index 338ce8f..3b6a843 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
@@ -41,7 +41,7 @@ import java.util.regex.Pattern;
 public class SysomosUtils {
 
     public static final Pattern CODE_PATTERN = Pattern.compile("code: ([0-9]+)");
-    public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'hh:mm:ssZ");
+    public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC();
     private final static Logger LOGGER = LoggerFactory.getLogger(SysomosUtils.class);
 
     private SysomosUtils() {}


[02/10] git commit: STREAMS-71 | Updated config to use date parameters

Posted by mf...@apache.org.
STREAMS-71 | Updated config to use date parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7afc27cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7afc27cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7afc27cf

Branch: refs/heads/master
Commit: 7afc27cf9d42b36aec343ac0f8be2626ed269671
Parents: d420205
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:01:45 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:01:45 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 34 +++++++++++++++++---
 1 file changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afc27cf/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 2a40450..f6f4f29 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -21,6 +21,7 @@ package org.apache.streams.sysomos.provider;
 
 import com.sysomos.xml.BeatApi;
 import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +30,8 @@ import org.slf4j.LoggerFactory;
  */
 public class SysomosHeartbeatStream implements Runnable {
 
+    private static enum OperatingMode { DATE, DOC_MATCH}
+
     private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class);
 
     private final SysomosProvider provider;
@@ -36,23 +39,37 @@ public class SysomosHeartbeatStream implements Runnable {
     private final String heartbeatId;
     private final long maxApiBatch;
     private final long minLatency;
+    private final OperatingMode mode;
 
     private String lastID;
+    private DateTime beforeTime;
+    private DateTime afterTime;
     private int offsetCount = 0;
     private boolean enabled = true;
 
     public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
-        this(provider, heartbeatId, null);
+        this(provider, heartbeatId, OperatingMode.DATE);
+    }
+
+    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) {
+        this(provider, heartbeatId, OperatingMode.DATE);
+        this.beforeTime = beforeTime;
+        this.afterTime = afterTime;
     }
 
-    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String docId) {
+    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String documentId) {
+        this(provider, heartbeatId, OperatingMode.DOC_MATCH);
+        this.lastID = documentId;
+    }
+
+    public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, OperatingMode mode) {
         this.provider = provider;
         this.heartbeatId = heartbeatId;
-        this.lastID = docId;
 
         this.client = provider.getClient();
         this.maxApiBatch = provider.getMaxApiBatch();
         this.minLatency = provider.getMinLatency();
+        this.mode = mode;
     }
 
     @Override
@@ -94,10 +111,17 @@ public class SysomosHeartbeatStream implements Runnable {
         BeatApi.BeatResponse response = null;
         try {
             if(enabled) {
-                response = this.client.createRequestBuilder()
+                RequestBuilder requestBuilder = this.client.createRequestBuilder()
                         .setHeartBeatId(heartbeatId)
                         .setOffset(offsetCount * maxApiBatch)
-                        .setReturnSetSize(maxApiBatch).execute();
+                        .setReturnSetSize(maxApiBatch);
+                if(beforeTime != null) {
+                    requestBuilder.setAddedBeforeDate(beforeTime);
+                }
+                if(afterTime != null) {
+                    requestBuilder.setAddedAfterDate(afterTime);
+                }
+                response = requestBuilder.execute();
 
                 LOGGER.debug("Received {} results from API query", response.getCount());
             }


[03/10] git commit: STREAMS-71 | Updated organization for easier update of operation

Posted by mf...@apache.org.
STREAMS-71 | Updated organization for easier update of operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/23fc0119
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/23fc0119
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/23fc0119

Branch: refs/heads/master
Commit: 23fc01199933a97be8414826f91eca12174b3789
Parents: 7afc27c
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 13 13:08:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 13 13:08:31 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 73 ++++++++++++--------
 1 file changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/23fc0119/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index f6f4f29..c234cb1 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,24 +79,32 @@ public class SysomosHeartbeatStream implements Runnable {
         //Iff we are trying to get to a specific document ID, continue to query after minimum delay
         do {
             LOGGER.debug("Querying API to match last ID of {}", lastID);
-            result = executeAPIRequest();
+            result = queryAPI();
             //Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
             //Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
             if(offsetCount == 1) {
                 mostCurrentId = result.getCurrentId();
             }
             sleep();
-        } while (lastID != null && !result.isMatchedLastId());
+        } while (shouldBackfill(result));
+        updateState(result, mostCurrentId);
+
+        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+    }
+
+    protected void updateState(QueryResult result, String mostCurrentId) {
         //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't
         //found the specific ID
         lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
 
         if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
-            provider.signalComplete(heartbeatId);
-            enabled = false;
+            shutdown();
             LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
         }
-        LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+    }
+
+    protected boolean shouldBackfill(QueryResult result) {
+        return lastID != null && !result.isMatchedLastId();
     }
 
     protected void sleep() {
@@ -104,30 +112,12 @@ public class SysomosHeartbeatStream implements Runnable {
             Thread.sleep(this.minLatency);
         } catch (InterruptedException e) {
             LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
+            shutdown();
         }
     }
 
-    protected QueryResult executeAPIRequest() {
-        BeatApi.BeatResponse response = null;
-        try {
-            if(enabled) {
-                RequestBuilder requestBuilder = this.client.createRequestBuilder()
-                        .setHeartBeatId(heartbeatId)
-                        .setOffset(offsetCount * maxApiBatch)
-                        .setReturnSetSize(maxApiBatch);
-                if(beforeTime != null) {
-                    requestBuilder.setAddedBeforeDate(beforeTime);
-                }
-                if(afterTime != null) {
-                    requestBuilder.setAddedAfterDate(afterTime);
-                }
-                response = requestBuilder.execute();
-
-                LOGGER.debug("Received {} results from API query", response.getCount());
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Error querying Sysomos API", e);
-        }
+    protected QueryResult queryAPI() {
+        BeatApi.BeatResponse response = executeAPIRequest();
 
         String currentId = null;
         boolean matched = false;
@@ -153,7 +143,36 @@ public class SysomosHeartbeatStream implements Runnable {
         return new QueryResult(matched, currentId);
     }
 
-    private class QueryResult {
+    protected BeatApi.BeatResponse executeAPIRequest() {
+        BeatApi.BeatResponse response = null;
+        try {
+            if(enabled) {
+                RequestBuilder requestBuilder = this.client.createRequestBuilder()
+                        .setHeartBeatId(heartbeatId)
+                        .setOffset(offsetCount * maxApiBatch)
+                        .setReturnSetSize(maxApiBatch);
+                if(beforeTime != null) {
+                    requestBuilder.setAddedBeforeDate(beforeTime);
+                }
+                if(afterTime != null) {
+                    requestBuilder.setAddedAfterDate(afterTime);
+                }
+                response = requestBuilder.execute();
+
+                LOGGER.debug("Received {} results from API query", response.getCount());
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Error querying Sysomos API", e);
+        }
+        return response;
+    }
+
+    protected void shutdown() {
+        provider.signalComplete(heartbeatId);
+        enabled = false;
+    }
+
+    protected class QueryResult {
         private boolean matchedLastId;
         private String currentId;
 


[05/10] git commit: Merge branch 'master' into STREAMS-71

Posted by mf...@apache.org.
Merge branch 'master' into STREAMS-71


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/899b2e25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/899b2e25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/899b2e25

Branch: refs/heads/master
Commit: 899b2e25a130273f5a054dabd92f7e350c31e6a4
Parents: 409b999 e73d2ad
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 20 15:24:42 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 20 15:24:42 2014 -0400

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistReader.java  |  9 ++++-
 .../streams/console/ConsolePersistReader.java   |  5 +++
 .../ElasticsearchPersistReader.java             |  8 ++++-
 .../streams/hdfs/WebHdfsPersistReader.java      |  9 ++++-
 .../streams/kafka/KafkaPersistReader.java       |  5 +++
 .../provider/DatasiftStreamProvider.java        |  5 +++
 .../google/gmail/provider/GMailProvider.java    |  8 ++++-
 .../google/gplus/provider/GPlusProvider.java    |  5 +++
 .../streams/data/moreover/MoreoverProvider.java |  5 +++
 .../streams/rss/provider/RssStreamProvider.java |  6 ++++
 .../sysomos/provider/SysomosProvider.java       |  7 +++-
 .../processor/TwitterProfileProcessor.java      | 13 +++----
 .../twitter/provider/TwitterStreamProvider.java |  5 +++
 .../provider/TwitterTimelineProvider.java       | 12 +++++--
 .../TwitterUserInformationProvider.java         | 11 +++++-
 .../apache/streams/core/StreamsProvider.java    | 38 +++++++++++++++++---
 .../test/providers/EmptyResultSetProvider.java  |  5 +++
 .../test/providers/NumericMessageProvider.java  |  5 +++
 .../test/component/FileReaderProvider.java      |  5 +++
 19 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/899b2e25/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------