You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/03/03 22:31:34 UTC

[geode-kafka-connector] branch master updated: Added some logging statements.

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a713123  Added some logging statements.
a713123 is described below

commit a713123163c10f4556896105c2a4b3e74eb6d4f2
Author: Naburun Nag <na...@cs.wisc.edu>
AuthorDate: Tue Mar 3 14:30:56 2020 -0800

    Added some logging statements.
---
 src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java     | 2 ++
 src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java | 4 ++++
 2 files changed, 6 insertions(+)

diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index c0fde75..700688e 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -56,6 +56,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   @Override
   public void start(Map<String, String> props) {
+    logger.info("Starting Apache Geode sink task");
     try {
       GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
       configure(geodeConnectorConfig);
@@ -142,6 +143,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   @Override
   public void stop() {
+    logger.info("Stopping task");
     geodeContext.close(false);
   }
 
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index a9edf95..d53a9e9 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -67,6 +67,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
 
   @Override
   public void start(Map<String, String> props) {
+    logger.info("Starting Apache Geode source task");
     try {
       GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
@@ -89,6 +90,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
       boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
       installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix,
           loadEntireRegion);
+      logger.info("Started Apache Geode source task");
     } catch (Exception e) {
       e.printStackTrace();
       logger.error("Unable to start source task", e);
@@ -98,6 +100,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
 
   @Override
   public List<SourceRecord> poll() {
+    logger.trace("Polling for new data");
     ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
     ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
     if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
@@ -117,6 +120,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
 
   @Override
   public void stop() {
+    logger.info("Stopping Apache Geode source task");
     geodeContext.close(true);
   }