You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 18:17:10 UTC
[09/10] git commit: STREAMS-71 | Updated logging
STREAMS-71 | Updated logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4ca2b6cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4ca2b6cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4ca2b6cd
Branch: refs/heads/master
Commit: 4ca2b6cddbd2a79c9cba04a049ca4d30126d7185
Parents: 2899af4
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 23 11:32:16 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 23 11:32:16 2014 -0400
----------------------------------------------------------------------
.../sysomos/proessor/SysomosTypeConverter.java | 56 --------------------
.../sysomos/processor/SysomosTypeConverter.java | 56 ++++++++++++++++++++
.../provider/SysomosHeartbeatStream.java | 6 +--
.../sysomos/provider/SysomosProvider.java | 12 +++--
4 files changed, 67 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
deleted file mode 100644
index 187d402..0000000
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
-
-import com.google.common.collect.Lists;
-import com.sysomos.xml.BeatApi;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
-
-import java.util.List;
-
-/**
- * Stream processor that converts Sysomos type to Activity
- */
-public class SysomosTypeConverter implements StreamsProcessor {
-
- private SysomosBeatActivityConverter converter;
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
- entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
- return Lists.newArrayList(entry);
- } else {
- return Lists.newArrayList();
- }
- }
-
- @Override
- public void prepare(Object configurationObject) {
- converter = new SysomosBeatActivityConverter();
- }
-
- @Override
- public void cleanUp() {
- //NOP
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
new file mode 100644
index 0000000..db9f416
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.processor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+ private SysomosBeatActivityConverter converter;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+ entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+ return Lists.newArrayList(entry);
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ converter = new SysomosBeatActivityConverter();
+ }
+
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c0bda15..5cc993e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -80,7 +80,7 @@ public class SysomosHeartbeatStream implements Runnable {
lastRunTime = DateTime.now();
//Iff we are trying to get to a specific document ID, continue to query after minimum delay
do {
- LOGGER.debug("Querying API to match last ID of {}", lastID);
+ LOGGER.debug("Querying API to match last ID of {} or time range of {} - {}", lastID, afterTime, beforeTime);
result = queryAPI();
//Ensure that we are only assigning lastID to the latest ID, even if there is backfill query.
//Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1
@@ -92,7 +92,7 @@ public class SysomosHeartbeatStream implements Runnable {
} while (offsetCount > 0);
updateState(result, mostCurrentId);
- LOGGER.debug("Completed current execution with a final docID of {}", lastID);
+ LOGGER.debug("Completed current execution with a final docID of {} or time of {}", lastID, afterTime);
}
protected void updateState(QueryResult result, String mostCurrentId) {
@@ -106,7 +106,7 @@ public class SysomosHeartbeatStream implements Runnable {
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
shutdown();
- LOGGER.info("Completed backfill to {} for heartbeat {}", lastID, heartbeatId);
+ LOGGER.info("Completed backfill to {} for heartbeat {}", OperatingMode.DOC_MATCH.equals(mode) ? lastID : afterTime, heartbeatId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ca2b6cd/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 128bc43..1b8f164 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -221,14 +221,18 @@ public class SysomosProvider implements StreamsProvider {
}
protected SysomosHeartbeatStream createStream(String heartbeatId) {
- String beforeTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
- String afterTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
+ String afterTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+ String beforeTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
if(documentIds != null && documentIds.containsKey(heartbeatId)) {
return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId));
}
- if(afterTime != null || beforeTime != null) {
- return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+ if(afterTime != null) {
+ if(beforeTime != null) {
+ return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+ } else {
+ return new SysomosHeartbeatStream(this, heartbeatId, null, RFC3339Utils.parseToUTC(afterTime));
+ }
}
return new SysomosHeartbeatStream(this, heartbeatId);
}