You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2020/03/02 19:07:23 UTC

[geode-kafka-connector] branch master updated: moved cache creation errors to GeodeContext added info logging removed some debug logging

This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a22b76e  moved cache creation errors to GeodeContext added info logging removed some debug logging
a22b76e is described below

commit a22b76ec52ca55d479f1f8bca9774a7578f5afa6
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Mon Mar 2 11:06:06 2020 -0800

    moved cache creation errors to GeodeContext
    added info logging
    removed some debug logging
---
 .../apache/geode/kafka/GeodeConnectorConfig.java   |  1 -
 .../java/org/apache/geode/kafka/GeodeContext.java  | 47 +++++++++++++---------
 .../geode/kafka/sink/GeodeKafkaSinkTask.java       |  9 +----
 .../kafka/source/GeodeKafkaSourceListener.java     |  3 +-
 .../geode/kafka/source/GeodeKafkaSourceTask.java   | 12 ++----
 5 files changed, 35 insertions(+), 37 deletions(-)

diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 586817a..3c218d4 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -66,7 +66,6 @@ public class GeodeConnectorConfig extends AbstractConfig {
     securityUserName = getString(SECURITY_USER);
     securityPassword = getPassword(SECURITY_PASSWORD);
     securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
-//    System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
     // if we registered a username/password instead of auth init, we should use the default auth
     // init if one isn't specified
     if (usesSecurity()) {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 418a476..5e377e3 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -61,28 +61,33 @@ public class GeodeContext {
   public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
                                        String durableClientTimeOut, String securityAuthInit, String securityUserName,
                                        String securityPassword, boolean usesSecurity) {
-    ClientCacheFactory ccf = new ClientCacheFactory();
-
-    ccf.setPdxReadSerialized(true);
-    if (usesSecurity) {
-      if (securityUserName != null && securityPassword != null) {
-        ccf.set(SECURITY_USER, securityUserName);
-        ccf.set(SECURITY_PASSWORD, securityPassword);
+    try {
+      ClientCacheFactory ccf = new ClientCacheFactory();
+
+      ccf.setPdxReadSerialized(true);
+      if (usesSecurity) {
+        if (securityUserName != null && securityPassword != null) {
+          ccf.set(SECURITY_USER, securityUserName);
+          ccf.set(SECURITY_PASSWORD, securityPassword);
+        }
+        ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
       }
-      ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
-    }
-    if (!durableClientName.equals("")) {
-      ccf.set("durable-client-id", durableClientName)
-          .set("durable-client-timeout", durableClientTimeOut);
-    }
-    // currently we only allow using the default pool.
-    // If we ever want to allow adding multiple pools we'll have to configure pool factories
-    ccf.setPoolSubscriptionEnabled(true);
+      if (!durableClientName.equals("")) {
+        ccf.set("durable-client-id", durableClientName)
+                .set("durable-client-timeout", durableClientTimeOut);
+      }
+      // currently we only allow using the default pool.
+      // If we ever want to allow adding multiple pools we'll have to configure pool factories
+      ccf.setPoolSubscriptionEnabled(true);
 
-    for (LocatorHostPort locator : locators) {
-      ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+      for (LocatorHostPort locator : locators) {
+        ccf.addPoolLocator(locator.getHostName(), locator.getPort());
+      }
+      return ccf.create();
+    } catch (Exception e) {
+        throw new ConnectException(
+                "Unable to create an client cache connected to Apache Geode cluster");
     }
-    return ccf.create();
   }
 
   public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
@@ -106,4 +111,8 @@ public class GeodeContext {
       throw new ConnectException(e);
     }
   }
+
+  public void close(boolean keepAlive) {
+    clientCache.close(keepAlive);
+  }
 }
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index 1688eb7..ef95450 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -61,16 +61,11 @@ public class GeodeKafkaSinkTask extends SinkTask {
       GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
-      final ClientCache clientCache =
-          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
               geodeConnectorConfig.getSecurityClientAuthInit(),
               geodeConnectorConfig.getSecurityUserName(),
               geodeConnectorConfig.getSecurityPassword(),
               geodeConnectorConfig.usesSecurity());
-      if (clientCache == null) {
-        throw new ConnectException(
-            "Unable to create a client cache connected to the Apache Geode cluster");
-      }
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
@@ -149,7 +144,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   @Override
   public void stop() {
-    geodeContext.getClientCache().close(false);
+    geodeContext.close(false);
   }
 
 }
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index 8a54766..f2f3142 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -51,7 +51,7 @@ class GeodeKafkaSourceListener implements CqStatusListener {
               TimeUnit.SECONDS))
             break;
         } catch (InterruptedException ex) {
-          ex.printStackTrace();
+          logger.info("Thread interrupted while updating buffer", ex);
         }
         logger.info("GeodeKafkaSource Queue is full");
       }
@@ -66,6 +66,7 @@ class GeodeKafkaSourceListener implements CqStatusListener {
   @Override
   public void onCqDisconnected() {
     // we should probably redistribute or reconnect
+    logger.info("cq has been disconnected");
   }
 
   @Override
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 24cd531..a04ea5e 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -29,7 +29,6 @@ import org.apache.kafka.connect.source.SourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.geode.cache.query.CqQuery;
@@ -68,22 +67,17 @@ public class GeodeKafkaSourceTask extends SourceTask {
   @Override
   public void start(Map<String, String> props) {
     try {
-      System.out.println("NABA ::" + props);
       GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
-      final ClientCache clientCache =
-          geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
               geodeConnectorConfig.getDurableClientId(),
               geodeConnectorConfig.getDurableClientTimeout(),
               geodeConnectorConfig.getSecurityClientAuthInit(),
               geodeConnectorConfig.getSecurityUserName(),
               geodeConnectorConfig.getSecurityPassword(),
               geodeConnectorConfig.usesSecurity());
-      if (clientCache == null) {
-        throw new ConnectException(
-            "Unable to create an client cache connected to Apache Geode cluster");
-      }
+
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
@@ -122,7 +116,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
 
   @Override
   public void stop() {
-    geodeContext.getClientCache().close(true);
+    geodeContext.close(true);
   }
 
   void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,