You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/03/03 22:31:34 UTC
[geode-kafka-connector] branch master updated: Added some logging
statements.
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a713123 Added some logging statements.
a713123 is described below
commit a713123163c10f4556896105c2a4b3e74eb6d4f2
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Mar 3 14:30:56 2020 -0800
Added some logging statements.
---
src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java | 2 ++
src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java | 4 ++++
2 files changed, 6 insertions(+)
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index c0fde75..700688e 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -56,6 +56,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
+ logger.info("Starting Apache Geode sink task");
try {
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
@@ -142,6 +143,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
@Override
public void stop() {
+ logger.info("Stopping task");
geodeContext.close(false);
}
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index a9edf95..d53a9e9 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -67,6 +67,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
+ logger.info("Starting Apache Geode source task");
try {
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
@@ -89,6 +90,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
loadEntireRegion);
+ logger.info("Started Apache Geode source task");
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start source task", e);
@@ -98,6 +100,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
@Override
public List<SourceRecord> poll() {
+ logger.trace("Polling for new data");
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
@@ -117,6 +120,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
@Override
public void stop() {
+ logger.info("Stopping Apache Geode source task");
geodeContext.close(true);
}