You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/10/20 22:03:40 UTC

[GitHub] [cassandra-sidecar] tharanga opened a new pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

tharanga opened a new pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18


   This is a WIP version of a Cassandra change stream emitter based on the CDC feature of Cassandra 4.0-beta2.
   
   **New dependencies:**
   - Cassandra 4.0-beta2 Jar
   
   **New config:**
   - cdc: configPath:Path to the Cassandra server config file
   
   **Pre-read:**
   https://cassandra.apache.org/doc/latest/operating/cdc.html
   
   **How to use:**
   
   1. Enable CDC in Cassandra through cassandra.yaml : `cdc_enabled: true`
   2. Set `commitlog_sync_period_in_ms: 10000` to a value on how quickly you want to observe changes (100ms lower limit)
   3. Enable CDC on a table `ALTER TABLE <your table> WITH cdc=true;`
   4. Change sidecar config `cdc: configPath:` to point to the cassandra.yaml
   5. Start the sidecar, insert data into the CDC enabled table and you'll see changes are emitted to the log
   
   **Current limitations:**
   - Restart the sidecar upon schema changes
   - Other unknown bugs due to the absence of unit tests
   
   **Tasks of the initial version:**
   - [x] Read and emit changes from CDC enabled tables from the local Cassandra node
   
   - [x] Emit events in real-time, as governed by Cassandra’s commit log flush interval
   
   - [x] Emit events in the same order as they appear in commit log (optionally tradeoff this for throughout)
   
   - [x] Bookmark the change stream; hence commit log read is resumable
   
   - [ ] Make bookmarking robust
   
   - [x] Provide the flexibility of emitting changes in different formats (JSON, PartitionUpdate, Avro) to different output types (Kafka, Console, etc)
   
   - [x] Add Console output
   
   - [ ] Add Kafka output
   
   - [x] Add PartitionUpdate format
   
   - [ ] Add JSON format 
   
   - [ ] (optional)Support start a fresh change stream from a full snapshot (of CDC enabled tables)
   
   - [ ] (optional) Provide an API to take full snapshots  (of CDC enabled tables)
   
   - [x] Support monitoring
   
   - [x] Support managing the CDC log
   
   - [ ] Unit and integration tests
   
   - [ ] Automatic schema change detector
   
   - [ ] Publish performance characteristics


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517720308



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);

Review comment:
       Thanks Max. Added a skeleton `CDCSchemaChangeListener`. Go ahead with the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517713077



##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
+ * defined in the Configuration.
+ */
+@Singleton
+public class CQLSession

Review comment:
       Good catch. Let me delete this use the common one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517877088



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,
+                            tableMetadata.params.cdc ? "true" : "false");
+                }
+            }
+            // Start monitoring the cdc_raw directory
+            this.cdcRawDirectoryMonitor.startMonitoring();
+            // Start reading the current commit log.
+            this.cdcIndexWatcher.run();
+
+        }
+        catch (Exception ex)
+        {
+            logger.error("Error starting the CDC reader {}", ex);
+            this.stop();
+            return;
+        }
+        logger.info("Successfully started the CDC reader");
+
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping CDC reader...");
+        this.cdcRawDirectoryMonitor.stop();
+        this.cdcIndexWatcher.stop();
+        logger.info("Successfully stopped the CDC reader");
+    }
+    @Override
+    public void onAdd(Host host)
+    {
+
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+
+    }
+
+    /**
+     * Waiting for the Cassandra server.
+     * */
+    private void waitForCassandraServer() throws InterruptedException
+    {
+        long retryIntervalMs = 1;
+        Cluster cluster = null;
+
+        while (cluster == null)
+        {
+            if (this.session.getLocalCql() != null)
+            {
+                cluster = session.getLocalCql().getCluster();
+            }
+            if (cluster != null)
+            {
+                break;
+            }
+            else
+            {
+                logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
+                        retryIntervalMs);
+                Thread.sleep(retryIntervalMs);
+                retryIntervalMs *= 2;

Review comment:
       what I mean is that set the process just sleep round and round is not a good choice, or we can just throw exception after some retry . So the users can go to check what is wrong with the process. Users may not know what is wrong with the process if the deamon is just sleep , they may think that the process is running healthy but actually the cassandra daemon is something wrong and the cdc reader is just sleep.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517877440



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,
+                            tableMetadata.params.cdc ? "true" : "false");
+                }
+            }
+            // Start monitoring the cdc_raw directory
+            this.cdcRawDirectoryMonitor.startMonitoring();
+            // Start reading the current commit log.
+            this.cdcIndexWatcher.run();
+
+        }
+        catch (Exception ex)
+        {
+            logger.error("Error starting the CDC reader {}", ex);
+            this.stop();
+            return;
+        }
+        logger.info("Successfully started the CDC reader");
+
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping CDC reader...");
+        this.cdcRawDirectoryMonitor.stop();
+        this.cdcIndexWatcher.stop();
+        logger.info("Successfully stopped the CDC reader");
+    }
+    @Override
+    public void onAdd(Host host)
+    {
+
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+
+    }
+
+    /**
+     * Waiting for the Cassandra server.
+     * */
+    private void waitForCassandraServer() throws InterruptedException
+    {
+        long retryIntervalMs = 1;
+        Cluster cluster = null;
+
+        while (cluster == null)
+        {
+            if (this.session.getLocalCql() != null)
+            {
+                cluster = session.getLocalCql().getCluster();
+            }
+            if (cluster != null)
+            {
+                break;
+            }
+            else
+            {
+                logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
+                        retryIntervalMs);
+                Thread.sleep(retryIntervalMs);
+                retryIntervalMs *= 2;

Review comment:
       we should let the user know what is going on




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r524870756



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);

Review comment:
       I know why you need to load the schema from disk....for the commitlog reader should deserialize from log need to have data judgement .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517721041



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,

Review comment:
       Yes, this is just a dummy code for interim development work. I will remove or move these to debug logs. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517728256



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,
+                            tableMetadata.params.cdc ? "true" : "false");
+                }
+            }
+            // Start monitoring the cdc_raw directory
+            this.cdcRawDirectoryMonitor.startMonitoring();
+            // Start reading the current commit log.
+            this.cdcIndexWatcher.run();
+
+        }
+        catch (Exception ex)
+        {
+            logger.error("Error starting the CDC reader {}", ex);
+            this.stop();
+            return;
+        }
+        logger.info("Successfully started the CDC reader");
+
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping CDC reader...");
+        this.cdcRawDirectoryMonitor.stop();
+        this.cdcIndexWatcher.stop();
+        logger.info("Successfully stopped the CDC reader");
+    }
+    @Override
+    public void onAdd(Host host)
+    {
+
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+
+    }
+
+    /**
+     * Waiting for the Cassandra server.
+     * */
+    private void waitForCassandraServer() throws InterruptedException
+    {
+        long retryIntervalMs = 1;
+        Cluster cluster = null;
+
+        while (cluster == null)
+        {
+            if (this.session.getLocalCql() != null)
+            {
+                cluster = session.getLocalCql().getCluster();
+            }
+            if (cluster != null)
+            {
+                break;
+            }
+            else
+            {
+                logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
+                        retryIntervalMs);
+                Thread.sleep(retryIntervalMs);
+                retryIntervalMs *= 2;

Review comment:
       Not sure whether a retry limit is helpful. If the sidecar has to do something useful, it has to wait until the Cassandra starts. If this throws an exception after certain retries, does that mean the sidecar has to stop? or other services are started but not the CDC reader?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r521982181



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Manages the CDC reader bookmark. This tracks the last successfully processed offset
+ * of a commit log.
+ */
+@Singleton
+public class CDCBookmark extends TimerTask
+{
+    // Tracks last disk sync'd commit log position.
+    private CommitLogPosition lastSyncedPosition;
+    // Tracks last successfully processed commit log position by the CDC reader.
+    private CommitLogPosition lastProcessedPosition;
+    private final Timer timer;
+    private static final String BOOKMARK = "CdcReader.bookmark";
+    private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class);
+    private final ReentrantLock bookmarkLock = new ReentrantLock();
+    private final Configuration conf;
+
+    @Inject
+    CDCBookmark(Configuration conf)
+    {
+        this.lastSyncedPosition = null;
+        this.lastProcessedPosition = null;
+        this.conf = conf;
+        this.timer = new Timer();
+    }
+
+    /**
+     * Persists last successfully processed commit log offset to the disk.
+     */
+    public void syncBookmark()
+    {
+        CommitLogPosition lastPosition = this.getLastProcessedPosition();
+
+        if (lastPosition == null)
+        {
+            return;
+        }
+        logger.debug("Last processed bookmark {}", this.lastProcessedPosition.toString());
+        try
+        {
+            if (lastPosition.equals(this.lastSyncedPosition))
+            {
+                return;
+            }
+
+            CommitLogPosition.CommitLogPositionSerializer serializer =
+                    new CommitLogPosition.CommitLogPositionSerializer();
+
+            // TODO: JSON ser-de and write-rename instead of writing directly to the bookmark
+            try (FileOutputStream fileOutputStream = new FileOutputStream(
+                    new File(this.getBookmarkPath())))
+            {
+                DataOutputPlus outBuffer = new DataOutputBuffer();
+                serializer.serialize(lastPosition, outBuffer);
+                fileOutputStream.write(((DataOutputBuffer) outBuffer).getData());
+                fileOutputStream.flush();
+                this.lastSyncedPosition = lastPosition;
+                logger.info("Successfully synced bookmark {} to the file {}", this.lastSyncedPosition.toString(),
+                        this.getBookmarkPath());
+            }
+            catch (IOException e)
+            {
+                logger.error("Error when writing bookmark {} to the file {}", lastPosition.toString(),
+                        this.getBookmarkPath());
+            }
+        }
+        catch (Exception ex)
+        {
+            logger.error("Sync exception {}", ex.getMessage());
+        }
+    }
+
+    /**
+     * Gets the path to the CDC reader bookmark.
+     *
+     * @return complete path to the bookmark file.
+     */
+    public String getBookmarkPath()
+    {
+        return String.format("%s/%s", DatabaseDescriptor.getCDCLogLocation(),
+                BOOKMARK);
+    }
+
+    @Override
+    public void run()
+    {
+        this.syncBookmark();
+    }
+
+    /**
+     * Gets the last successfully processed commit log offset.
+     * This method is thread safe.
+     *
+     * @return last successfully processed commit log offset.
+     */
+    public CommitLogPosition getLastProcessedPosition()
+    {
+        CommitLogPosition lastPosition = null;
+        try
+        {
+            bookmarkLock.lock();
+            if (this.lastProcessedPosition != null)
+            {
+                lastPosition = new CommitLogPosition(this.lastProcessedPosition.segmentId,
+                        this.lastProcessedPosition.position);
+
+            }
+        }
+        finally
+        {
+            bookmarkLock.unlock();
+        }
+        return lastPosition;
+    }
+
+    /**
+     * Sets the last successfully processed commit log offset.
+     * This method is thread safe.
+     *
+     */
+    public void setLastProcessedPosition(CommitLogPosition processedPosition)
+    {
+        try
+        {
+            bookmarkLock.lock();
+            this.lastProcessedPosition = processedPosition;
+        }
+        finally
+        {
+            bookmarkLock.unlock();
+        }
+    }
+
+    /**
+     * Starts the background thread to write processed commit log positions to the disk.
+     * */
+    public void startBookmarkSync()
+    {
+        timer.schedule(this, 0, DatabaseDescriptor.getCommitLogSyncPeriod());

Review comment:
       I do not know what version of jdk do you use. for me ,I found that 
   the Timer class with the schedule function can not set the delay input argument
    to be less that 1. must be bigger that 0. 
   Or an exception will be throw




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517705057



##########
File path: src/main/dist/conf/sidecar.yaml
##########
@@ -24,3 +24,6 @@ sidecar:
 
 healthcheck:
   - poll_freq_millis: 30000
+
+cdc:

Review comment:
       I'd wait for a general need and then refactor it out of the cdc section. We can do it now if there's such a need.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r513323664



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);

Review comment:
       why we load from disk ? for all cassandra schema information ,we can just reside on cassandra driver ,we can make first connection to cassandra, and get  the information from cassandra driver , also the tablemeta when we need to know if the table's cdc is enable 

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,
+                            tableMetadata.params.cdc ? "true" : "false");
+                }
+            }
+            // Start monitoring the cdc_raw directory
+            this.cdcRawDirectoryMonitor.startMonitoring();
+            // Start reading the current commit log.
+            this.cdcIndexWatcher.run();
+
+        }
+        catch (Exception ex)
+        {
+            logger.error("Error starting the CDC reader {}", ex);
+            this.stop();
+            return;
+        }
+        logger.info("Successfully started the CDC reader");
+
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping CDC reader...");
+        this.cdcRawDirectoryMonitor.stop();
+        this.cdcIndexWatcher.stop();
+        logger.info("Successfully stopped the CDC reader");
+    }
+    @Override
+    public void onAdd(Host host)
+    {
+
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+
+    }
+
+    /**
+     * Waiting for the Cassandra server.
+     * */
+    private void waitForCassandraServer() throws InterruptedException
+    {
+        long retryIntervalMs = 1;
+        Cluster cluster = null;
+
+        while (cluster == null)
+        {
+            if (this.session.getLocalCql() != null)
+            {
+                cluster = session.getLocalCql().getCluster();
+            }
+            if (cluster != null)
+            {
+                break;
+            }
+            else
+            {
+                logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
+                        retryIntervalMs);
+                Thread.sleep(retryIntervalMs);
+                retryIntervalMs *= 2;

Review comment:
       I modify the code :
   long retryIntervalMs = 10;
           int defaultRetryTime = 5;
           Cluster cluster = null;
   
           while (cluster == null)
           {
               int retryTime = 0;
               if (this.session.getLocalCql() != null)
               {
                   cluster = session.getLocalCql().getCluster();
               }
               if (cluster != null)
               {
                   break;
               }
               else
               {
                   logger.warn("Waiting for Cassandra server to start. Retrying after {} milliseconds",
                       retryIntervalMs);
                   if (retryTime++ >= defaultRetryTime)
                       throw new InterruptedException(String.format("Can not connect to cassandra after retry  %s times", defaultRetryTime));
                   Thread.sleep(retryIntervalMs);
                   retryIntervalMs *= 2;
               }

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);

Review comment:
       I think I can help with this todo, Now I am working on it

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,
+                            tableMetadata.params.cdc ? "true" : "false");
+                }
+            }
+            // Start monitoring the cdc_raw directory
+            this.cdcRawDirectoryMonitor.startMonitoring();
+            // Start reading the current commit log.
+            this.cdcIndexWatcher.run();
+
+        }
+        catch (Exception ex)
+        {
+            logger.error("Error starting the CDC reader {}", ex);
+            this.stop();
+            return;
+        }
+        logger.info("Successfully started the CDC reader");
+
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping CDC reader...");
+        this.cdcRawDirectoryMonitor.stop();
+        this.cdcIndexWatcher.stop();
+        logger.info("Successfully stopped the CDC reader");
+    }
+    @Override
+    public void onAdd(Host host)
+    {
+
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+
+    }
+
+    /**
+     * Waiting for the Cassandra server.
+     * */
+    private void waitForCassandraServer() throws InterruptedException
+    {
+        long retryIntervalMs = 1;
+        Cluster cluster = null;
+
+        while (cluster == null)
+        {
+            if (this.session.getLocalCql() != null)
+            {
+                cluster = session.getLocalCql().getCluster();
+            }
+            if (cluster != null)
+            {
+                break;
+            }
+            else
+            {
+                logger.info("Waiting for Cassandra server to start. Retrying after {} milliseconds",
+                        retryIntervalMs);
+                Thread.sleep(retryIntervalMs);
+                retryIntervalMs *= 2;

Review comment:
       do you think we should add some retry limit ? in this case ,the program will always try to connect to cassandra. 
   I think we can add a default retry time such as 3, and init retryIntervalMs can be 10ms, we can retry 3 time ,if cluster is still null we can throw an exception?

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);

Review comment:
       and I saw your "TODO" 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517706800



##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;

Review comment:
       Thanks! I will add this header to all new files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517721416



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,

Review comment:
       To the `CassandraSidecarDaemon`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517716715



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r512450830



##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;

Review comment:
       The file should got the header with "apache licence ?"
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */ 

##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
+ * defined in the Configuration.
+ */
+@Singleton
+public class CQLSession
+{
+    private static final Logger logger = LoggerFactory.getLogger(CQLSession.class);
+    @Nullable
+    private Session localSession;
+    private final InetSocketAddress inet;
+    private final WhiteListPolicy wlp;
+    private NettyOptions nettyOptions;
+    private QueryOptions queryOptions;
+    private ReconnectionPolicy reconnectionPolicy;
+
+    @Inject
+    public CQLSession(Configuration configuration)
+    {
+        inet = InetSocketAddress.createUnresolved(configuration.getCassandraHost(), configuration.getCassandraPort());
+        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
+        this.nettyOptions = new NettyOptions();
+        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000,
+                configuration.getHealthCheckFrequencyMillis());
+    }
+
+    @VisibleForTesting
+    CQLSession(InetSocketAddress target, NettyOptions options)
+    {
+        inet = target;
+        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
+        this.nettyOptions = options;
+        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
+        reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
+    }
+
+    /**
+     * Provides a Session connected only to the local node from configuration. If null it means the the connection was
+     * not able to be established. The session still might throw a NoHostAvailableException if the local host goes
+     * offline or otherwise unavailable.
+     *
+     * @return Session
+     */
+    @Nullable
+    public synchronized Session getLocalCql()
+    {
+        Cluster cluster = null;
+        try
+        {
+            if (localSession == null)
+            {
+                cluster = Cluster.builder()
+                        .addContactPointsWithPorts(inet)
+                        .withLoadBalancingPolicy(wlp)
+                        .withQueryOptions(queryOptions)
+                        .withReconnectionPolicy(reconnectionPolicy)
+                        .withoutMetrics()
+                        // tests can create a lot of these Cluster objects, to avoid creating HWTs and
+                        // event thread pools for each we have the override
+                        .withNettyOptions(nettyOptions)
+                        .build();
+                localSession = cluster.connect();
+            }
+        }
+        catch (Exception e)
+        {
+            logger.debug("Failed to reach Cassandra", e);

Review comment:
       for me , I think the logging level for https://github.com/apache/cassandra-sidecar/blob/trunk/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java#L108 should also be error level , not debug level

##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
+ * defined in the Configuration.
+ */
+@Singleton
+public class CQLSession

Review comment:
       we have got a CQLSession class In Common dir 

##########
File path: src/main/dist/conf/sidecar.yaml
##########
@@ -24,3 +24,6 @@ sidecar:
 
 healthcheck:
   - poll_freq_millis: 30000
+
+cdc:

Review comment:
       I think this can change from "cdc" to "cassandra config file" for we may got some other useage of cassandra yaml path not only cdc .

##########
File path: src/main/java/org/apache/cassandra/sidecar/MainModule.java
##########
@@ -151,6 +163,7 @@ public Configuration configuration() throws ConfigurationException, IOException
                     .setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null))
                     .setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null))
                     .setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false))
+                    .setCassandraConfigPath(yamlConf.get(String.class, "cdc.configPath"))

Review comment:
       as I said before, the cassandra configre path can also be for cassandra choice at the sidecar.yaml。

##########
File path: src/main/java/org/apache/cassandra/sidecar/Configuration.java
##########
@@ -55,12 +55,17 @@
 
     private final boolean isSslEnabled;
 
+    /* Cassandra server conf path */
+    @Nullable
+    private String cassandraConfigPath;

Review comment:
       why cassandraConfigPath can be null ?

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java
##########
@@ -0,0 +1,68 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Supplier;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**

Review comment:
       I think this file should be under sidecar dir with the configuration.java or to make it better if we can make the cassandraconfiguration with configuration.java

##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       and I also think we can add a common method where all other service can be added inner the method . 
   such as startInitService() , for me ,I think cdcReaderService is a sidecar init start service and we can use a flag to 
   show the service's start or not . 

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+

Review comment:
       shold add apache licenece header ?

##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       Besides , do you think we should add a flag to enable or disable the cdc reader service?

##########
File path: src/main/dist/conf/sidecar.yaml
##########
@@ -24,3 +24,6 @@ sidecar:
 
 healthcheck:
   - poll_freq_millis: 30000
+
+cdc:

Review comment:
       and I think this config path should be put to "cassandra:"  which of the top choice of the sidecar.yaml

##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       add a log for cdc reader service?

##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
+ * defined in the Configuration.
+ */
+@Singleton
+public class CQLSession

Review comment:
       I think we can use this class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] rustyrazorblade commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
rustyrazorblade commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r526488137



##########
File path: src/main/java/org/apache/cassandra/sidecar/MainModule.java
##########
@@ -126,6 +130,14 @@ public Router vertxRouter(Vertx vertx)
         return router;
     }
 
+    @Override
+    protected void  configure()
+    {
+        // TODO: Make the output type configurable
+        bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class);
+        bind(Output.class).to(ConsoleOutput.class);

Review comment:
       I'm not seeing an advantage to the request, @Maxwell-Guo.  The approach @tharanga took is pretty standard for a Guice binding.

##########
File path: build.gradle
##########
@@ -179,6 +179,7 @@ dependencies {
 
     compile project(":common")
     compile project(":cassandra40")
+    compile 'org.apache.cassandra:cassandra-all:4.0-beta2'

Review comment:
       If you take a look at the last commit I added, I spent a lot of time trying to decouple the sidecar from using a specific version of Cassandra.  Each version we decide to support can (and will) have an adapter, allowing us to maintain a single sidecar project that can work with different versions of Cassandra each of which has different implementations.  There's no assurance that C* 5.0 will have the same CDC implementation as the 4.0 version.  Could you please move the version specific logic into the cassandra40 subproject?
   
   In addition, we may want to have the user point to their cassandra lib directory as well in order to not ship every version of C* with the sidecar.  That will give us the flexibility for folks to use their own builds (private or public) as well as ship a smaller artifact.  Since everyone *has* to run Cassandra I think this is a fair ask.  Using a `compileOnly` dependency would allow us to test against each version of C* without shipping the jars.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517716416



##########
File path: src/main/java/org/apache/cassandra/sidecar/Configuration.java
##########
@@ -55,12 +55,17 @@
 
     private final boolean isSslEnabled;
 
+    /* Cassandra server conf path */
+    @Nullable
+    private String cassandraConfigPath;

Review comment:
       I originally allowed null so the CDC reader won't start. However, now we are adding a config to enable/disable it; hence this should to be a nullable field.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514076434



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java
##########
@@ -0,0 +1,121 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.cdc.output.Output;
+import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
+/**
+ * Implements Cassandra CommitLogReadHandler, dandles mutations read from Cassandra commit logs.
+ */
+public class MutationHandler implements CommitLogReadHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutationHandler.class);
+    private Output output;
+    Future<CommitLogPosition> mutationFuture = null;
+    private ExecutorService executor;
+    private CDCReaderMonitor monitor;
+    private CDCBookmark bookmark;

Review comment:
       we can also add "private Configuration conf;" here and at the constructor function,we can set the conf to input configuration conf ,then we can use the configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517714167



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       Good suggestion. Let me add that, so users who don't need this can just keep it disabled. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517713916



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       You mean a log saying the CDCReaderService started? There is such a log statement in that class: `logger.info("Successfully started the CDC reader");`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r512645047



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java
##########
@@ -0,0 +1,68 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Supplier;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**

Review comment:
       I think this file should be under sidecar dir with the configuration.java or to make it better if we can make the cassandraconfiguration with configuration.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517722085



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java
##########
@@ -0,0 +1,121 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.cdc.output.Output;
+import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
+/**
+ * Implements Cassandra CommitLogReadHandler, dandles mutations read from Cassandra commit logs.
+ */
+public class MutationHandler implements CommitLogReadHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutationHandler.class);
+    private Output output;
+    Future<CommitLogPosition> mutationFuture = null;
+    private ExecutorService executor;
+    private CDCReaderMonitor monitor;
+    private CDCBookmark bookmark;

Review comment:
       +1, dead code, removing it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514074195



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java
##########
@@ -0,0 +1,68 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Supplier;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Custom Cassandra configurator
+ */
+@Singleton
+public class CassandraConfig implements Supplier<Config>
+{
+    private Config config;
+
+    @Inject
+    public CassandraConfig(Configuration config)

Review comment:
       I think the input can just be a string mean the cassandra config file path, and can also be easilily inject 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r523128176



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java
##########
@@ -0,0 +1,90 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
+
+/**
+ * Monitors the cdc_raw directory, cleanup unused commit logs and report metrics
+ * */
+@Singleton
+public class CDCRawDirectoryMonitor extends TimerTask
+{
+
+    private final Timer timer;
+    private final CDCReaderMonitor monitor;
+    private volatile boolean running;
+    private static final Logger logger = LoggerFactory.getLogger(CDCRawDirectoryMonitor.class);
+
+    @Inject
+    CDCRawDirectoryMonitor(CDCReaderMonitor monitor)
+    {
+        this.timer = new Timer();
+        this.monitor = monitor;
+        this.running = false;
+    }
+
+    /**
+     * Starts the background thread to monitor the cdc_raw dir.
+     * */
+    public void startMonitoring()
+    {
+        this.running = true;
+        timer.schedule(this, 0, DatabaseDescriptor.getCDCDiskCheckInterval());
+    }
+
+    @Override
+    public void run()
+    {
+        if (!this.running)
+        {
+            return;
+        }
+        // TODO : Don't be someone who just complains, do some useful work, clean files older than
+        //  the last persisted bookmark.
+        this.monitor.reportCdcRawDirectorySizeInBytes(getCdcRawDirectorySize());

Review comment:
       We do delete commit logs after reading them : https://github.com/apache/cassandra-sidecar/pull/18/files#diff-32afd2c4bf3fe7d4c3268bec08a902cf36fb92b079d9c99fe47bafe0b073d5ceR289. However, if the output is blocked/slow for some reason (e.g. a Kafka node is down), then commit logs can pile up and eventually halt Cassandra writes. This is the default behavior, but we don't want that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514116312



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/MutationHandler.java
##########
@@ -0,0 +1,121 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.cdc.output.Output;
+import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
+/**
+ * Implements Cassandra CommitLogReadHandler, dandles mutations read from Cassandra commit logs.
+ */
+public class MutationHandler implements CommitLogReadHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutationHandler.class);
+    private Output output;
+    Future<CommitLogPosition> mutationFuture = null;
+    private ExecutorService executor;
+    private CDCReaderMonitor monitor;
+    private CDCBookmark bookmark;

Review comment:
       or if we can just delete input params Configuration conf 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r529092869



##########
File path: build.gradle
##########
@@ -179,6 +179,7 @@ dependencies {
 
     compile project(":common")
     compile project(":cassandra40")
+    compile 'org.apache.cassandra:cassandra-all:4.0-beta2'

Review comment:
       @rustyrazorblade +1 for both suggestions. I was thinking of punting this to a future commit, but I see the work you've done at `a4805a910904019698ae373ac33f88855cf67f3d`. Let me refactor this code to address both points.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517713321



##########
File path: src/main/java/org/apache/cassandra/sidecar/CQLSession.java
##########
@@ -0,0 +1,115 @@
+package org.apache.cassandra.sidecar;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
+ * defined in the Configuration.
+ */
+@Singleton
+public class CQLSession
+{
+    private static final Logger logger = LoggerFactory.getLogger(CQLSession.class);
+    @Nullable
+    private Session localSession;
+    private final InetSocketAddress inet;
+    private final WhiteListPolicy wlp;
+    private NettyOptions nettyOptions;
+    private QueryOptions queryOptions;
+    private ReconnectionPolicy reconnectionPolicy;
+
+    @Inject
+    public CQLSession(Configuration configuration)
+    {
+        inet = InetSocketAddress.createUnresolved(configuration.getCassandraHost(), configuration.getCassandraPort());
+        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
+        this.nettyOptions = new NettyOptions();
+        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000,
+                configuration.getHealthCheckFrequencyMillis());
+    }
+
+    @VisibleForTesting
+    CQLSession(InetSocketAddress target, NettyOptions options)
+    {
+        inet = target;
+        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
+        this.nettyOptions = options;
+        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
+        reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
+    }
+
+    /**
+     * Provides a Session connected only to the local node from configuration. If null it means the the connection was
+     * not able to be established. The session still might throw a NoHostAvailableException if the local host goes
+     * offline or otherwise unavailable.
+     *
+     * @return Session
+     */
+    @Nullable
+    public synchronized Session getLocalCql()
+    {
+        Cluster cluster = null;
+        try
+        {
+            if (localSession == null)
+            {
+                cluster = Cluster.builder()
+                        .addContactPointsWithPorts(inet)
+                        .withLoadBalancingPolicy(wlp)
+                        .withQueryOptions(queryOptions)
+                        .withReconnectionPolicy(reconnectionPolicy)
+                        .withoutMetrics()
+                        // tests can create a lot of these Cluster objects, to avoid creating HWTs and
+                        // event thread pools for each we have the override
+                        .withNettyOptions(nettyOptions)
+                        .build();
+                localSession = cluster.connect();
+            }
+        }
+        catch (Exception e)
+        {
+            logger.debug("Failed to reach Cassandra", e);

Review comment:
       I removed this class and started using the common CQLSession.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r521982181



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Manages the CDC reader bookmark. This tracks the last successfully processed offset
+ * of a commit log.
+ */
+@Singleton
+public class CDCBookmark extends TimerTask
+{
+    // Tracks last disk sync'd commit log position.
+    private CommitLogPosition lastSyncedPosition;
+    // Tracks last successfully processed commit log position by the CDC reader.
+    private CommitLogPosition lastProcessedPosition;
+    private final Timer timer;
+    private static final String BOOKMARK = "CdcReader.bookmark";
+    private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class);
+    private final ReentrantLock bookmarkLock = new ReentrantLock();
+    private final Configuration conf;
+
+    @Inject
+    CDCBookmark(Configuration conf)
+    {
+        this.lastSyncedPosition = null;
+        this.lastProcessedPosition = null;
+        this.conf = conf;
+        this.timer = new Timer();
+    }
+
+    /**
+     * Persists last successfully processed commit log offset to the disk.
+     */
+    public void syncBookmark()
+    {
+        CommitLogPosition lastPosition = this.getLastProcessedPosition();
+
+        if (lastPosition == null)
+        {
+            return;
+        }
+        logger.debug("Last processed bookmark {}", this.lastProcessedPosition.toString());
+        try
+        {
+            if (lastPosition.equals(this.lastSyncedPosition))
+            {
+                return;
+            }
+
+            CommitLogPosition.CommitLogPositionSerializer serializer =
+                    new CommitLogPosition.CommitLogPositionSerializer();
+
+            // TODO: JSON ser-de and write-rename instead of writing directly to the bookmark
+            try (FileOutputStream fileOutputStream = new FileOutputStream(
+                    new File(this.getBookmarkPath())))
+            {
+                DataOutputPlus outBuffer = new DataOutputBuffer();
+                serializer.serialize(lastPosition, outBuffer);
+                fileOutputStream.write(((DataOutputBuffer) outBuffer).getData());
+                fileOutputStream.flush();
+                this.lastSyncedPosition = lastPosition;
+                logger.info("Successfully synced bookmark {} to the file {}", this.lastSyncedPosition.toString(),
+                        this.getBookmarkPath());
+            }
+            catch (IOException e)
+            {
+                logger.error("Error when writing bookmark {} to the file {}", lastPosition.toString(),
+                        this.getBookmarkPath());
+            }
+        }
+        catch (Exception ex)
+        {
+            logger.error("Sync exception {}", ex.getMessage());
+        }
+    }
+
+    /**
+     * Gets the path to the CDC reader bookmark.
+     *
+     * @return complete path to the bookmark file.
+     */
+    public String getBookmarkPath()
+    {
+        return String.format("%s/%s", DatabaseDescriptor.getCDCLogLocation(),
+                BOOKMARK);
+    }
+
+    @Override
+    public void run()
+    {
+        this.syncBookmark();
+    }
+
+    /**
+     * Gets the last successfully processed commit log offset.
+     * This method is thread safe.
+     *
+     * @return last successfully processed commit log offset.
+     */
+    public CommitLogPosition getLastProcessedPosition()
+    {
+        CommitLogPosition lastPosition = null;
+        try
+        {
+            bookmarkLock.lock();
+            if (this.lastProcessedPosition != null)
+            {
+                lastPosition = new CommitLogPosition(this.lastProcessedPosition.segmentId,
+                        this.lastProcessedPosition.position);
+
+            }
+        }
+        finally
+        {
+            bookmarkLock.unlock();
+        }
+        return lastPosition;
+    }
+
+    /**
+     * Sets the last successfully processed commit log offset.
+     * This method is thread safe.
+     *
+     */
+    public void setLastProcessedPosition(CommitLogPosition processedPosition)
+    {
+        try
+        {
+            bookmarkLock.lock();
+            this.lastProcessedPosition = processedPosition;
+        }
+        finally
+        {
+            bookmarkLock.unlock();
+        }
+    }
+
+    /**
+     * Starts the background thread to write processed commit log positions to the disk.
+     * */
+    public void startBookmarkSync()
+    {
+        timer.schedule(this, 0, DatabaseDescriptor.getCommitLogSyncPeriod());

Review comment:
       I do not know what version of jdk do you use. for me ,I found that 
   the Timer class with the schedule function can not set the delay input argument
    to be less that 1. must be bigger that 0. 
   Or an exception will be throw




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514077627



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Manages the CDC reader bookmark. This tracks the last successfully processed offset
+ * of a commit log.
+ */
+@Singleton
+public class CDCBookmark extends TimerTask
+{
+    // Tracks last disk sync'd commit log position.
+    private CommitLogPosition lastSyncedPosition;
+    // Tracks last successfully processed commit log position by the CDC reader.
+    private CommitLogPosition lastProcessedPosition;
+    private final Timer timer;
+    private static final String BOOKMARK = "CdcReader.bookmark";
+    private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class);
+    private final ReentrantLock bookmarkLock = new ReentrantLock();
+    private final Configuration conf;
+
+    @Inject
+    CDCBookmark(Configuration conf)

Review comment:
       Configuration conf is not used in  the class, so I want to know if we will use in the future ?same with MutationHandler class




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r523126042



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {

Review comment:
       This code was just added for debugging. Users can alter tables to enable CDC at any time. Sidecar shouldn't wait. Also, whatever user does is not visible to us without reloading metadata. This is where the `CDCSchemaChangeListener` is helpful.
   +1 for the suggestion. When `CDCSchemaChangeListener` is working, we can add a guard rail to `CDCIndexWatcher` so it won't read commit log entries when there are no tables with CDC enabled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: (WIP) CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r530060479



##########
File path: build.gradle
##########
@@ -179,6 +179,7 @@ dependencies {
 
     compile project(":common")
     compile project(":cassandra40")
+    compile 'org.apache.cassandra:cassandra-all:4.0-beta2'

Review comment:
       @rustyrazorblade +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514037070



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java
##########
@@ -0,0 +1,12 @@
+package org.apache.cassandra.sidecar.cdc.output;
+
+import java.io.Closeable;
+import org.apache.cassandra.sidecar.cdc.Change;
+
+/**
+ * Interface for emitting Cassandra PartitionUpdates
+ */
+public interface Output extends Closeable
+{
+    void emitChange(Change change) throws Exception;

Review comment:
       besides , I want to add some method such emitChange but the return is partitionUpdate

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {
+                    logger.info("Table : {}, CDC enabled ? {}", tableMetadata.name,

Review comment:
       After read the code, I saw that this code is useless ,If the cdc is enable can get from mutation at the mutationhandler section . So the code here is useless I think ,And we may not doing the schema load disk method to get the schema meta data




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514055952



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)

Review comment:
       when was the cdcReaderService init ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514137537



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();
+
+            for (String keySpace : Schema.instance.getKeyspaces())
+            {
+                logger.info("Keyspace : {}", keySpace);
+                KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keySpace);
+                if (keyspaceMetadata == null)
+                {
+                    continue;
+                }
+                for (TableMetadata tableMetadata : keyspaceMetadata.tablesAndViews())
+                {

Review comment:
       here ,I think we can add a loop , that every time we check if there is a table have got its cdc enable in schema info , 
   if all table 's cdc is not enabled ,we can just go to sleep  for a while , such as commitlog flush period ,in this way for 
   users that do not use cdc ,we can just save some resource. 
   also every loop the schema should refresh (I think the cassandra driver have done ,we just use it );
   when a table is cdc enabled ,we go to next step;m
   my code here is :
   
   // Ensure Cassandra config is valid and remove mutable data paths from the config
               // to ensure CDC reader doesn't accidentally step on Cassandra data.
               this.cassandraConfig.init();
   
               while (true)
               {
                   int cdcEnableTables = 0;
                   Metadata metadata = this.session.getLocalCql().getCluster().getMetadata();
                   List<KeyspaceMetadata> keyspaceMetadatas = metadata.getKeyspaces();
                   for (KeyspaceMetadata keyspaceMetadata : keyspaceMetadatas)
                   {
                       if (keyspaceMetadata == null)
                       {
                           continue;
                       }
                       for (TableMetadata tableMetadata : keyspaceMetadata.getTables())
                       {
                           if (tableMetadata.getOptions().isCDC())
                               ++cdcEnableTables;
                       }
                   }
                   if (cdcEnableTables != 0)
                       break;
                   logger.warn("There is no table enable the cdc , just sleep for %", DatabaseDescriptor.getCommitLogSyncPeriod());
                   Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
               }
   
               this.cassandraConfig.muteConfigs();
               // Start monitoring the cdc_raw directory
               this.cdcRawDirectoryMonitor.startMonitoring();

##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,
+                            CassandraConfig cassandraConfig)
+    {
+        this.cdcRawDirectoryMonitor = monitor;
+        this.cdcIndexWatcher = cdcIndexWatcher;
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public synchronized void start()
+    {
+        try
+        {
+            // Wait until the Cassandra server is UP to load configs and start subsystems. There's no guarantee the
+            // config is valid otherwise.
+            waitForCassandraServer();
+            Cluster cluster = session.getLocalCql().getCluster();
+            if (cluster == null)
+            {
+                throw new InvalidObjectException("Cannot connect to the local Cassandra node");
+            }
+
+            // Ensure Cassandra config is valid and remove mutable data paths from the config
+            // to ensure CDC reader doesn't accidentally step on Cassandra data.
+            this.cassandraConfig.init();
+            // TODO : Load metadata from the CQLSession.
+            Schema.instance.loadFromDisk(false);
+            this.cassandraConfig.muteConfigs();

Review comment:
       why we mute the loaded configuration file ? I think now we just use the cdc location ,we can just at first load the configuration get the data and assigned the cassandraConfig to null if you think the configuration may cost some memory 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517874552



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517720623



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/output/Output.java
##########
@@ -0,0 +1,12 @@
+package org.apache.cassandra.sidecar.cdc.output;
+
+import java.io.Closeable;
+import org.apache.cassandra.sidecar.cdc.Change;
+
+/**
+ * Interface for emitting Cassandra PartitionUpdates
+ */
+public interface Output extends Closeable
+{
+    void emitChange(Change change) throws Exception;

Review comment:
       How the method signature would look like in that case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] tharanga commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
tharanga commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517728534



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCBookmark.java
##########
@@ -0,0 +1,213 @@
+package org.apache.cassandra.sidecar.cdc;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Manages the CDC reader bookmark. This tracks the last successfully processed offset
+ * of a commit log.
+ */
+@Singleton
+public class CDCBookmark extends TimerTask
+{
+    // Tracks last disk sync'd commit log position.
+    private CommitLogPosition lastSyncedPosition;
+    // Tracks last successfully processed commit log position by the CDC reader.
+    private CommitLogPosition lastProcessedPosition;
+    private final Timer timer;
+    private static final String BOOKMARK = "CdcReader.bookmark";
+    private static final Logger logger = LoggerFactory.getLogger(CDCBookmark.class);
+    private final ReentrantLock bookmarkLock = new ReentrantLock();
+    private final Configuration conf;
+
+    @Inject
+    CDCBookmark(Configuration conf)

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r517874383



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)
     {
         this.server = server;
         this.config = config;
+        this.cdcReaderService = cdcReaderService;
     }
 
     public void start()
     {
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
+        cdcReaderService.start();

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514055952



##########
File path: src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
##########
@@ -38,19 +38,22 @@
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
     private final HttpServer server;
     private final Configuration config;
+    private final CDCReaderService cdcReaderService;
 
     @Inject
-    public CassandraSidecarDaemon(HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config, CDCReaderService cdcReaderService)

Review comment:
       when was the cdcReaderService init ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514074195



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CassandraConfig.java
##########
@@ -0,0 +1,68 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Supplier;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.sidecar.Configuration;
+
+/**
+ * Custom Cassandra configurator
+ */
+@Singleton
+public class CassandraConfig implements Supplier<Config>
+{
+    private Config config;
+
+    @Inject
+    public CassandraConfig(Configuration config)

Review comment:
       I think the input can just be a string mean the cassandra config file path, and can also be easilily inject 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514110640



##########
File path: src/main/java/org/apache/cassandra/sidecar/MainModule.java
##########
@@ -126,6 +130,14 @@ public Router vertxRouter(Vertx vertx)
         return router;
     }
 
+    @Override
+    protected void  configure()
+    {
+        // TODO: Make the output type configurable
+        bind(CDCReaderMonitor.class).to(CDCReaderMonitorLogger.class);
+        bind(Output.class).to(ConsoleOutput.class);

Review comment:
       can we remove this method and like other variables such as HttpServer/Vertx/VertxRequestHandler/Router/Configuration use @Provide to get the real value and the the out put can be configurable ;
   that is my code;
    @Provides
       @Singleton
       public Output outPut(Configuration conf)
       {
           String outPutClass = conf.getOutPut();
           if (!outPutClass.contains("."))
               outPutClass = "org.apache.cassandra.sidecar.common.output." + outPutClass; // I move the out put to common dir
           Output output = FBUtilities.construct(outPutClass, "output");
           return output;
       }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514188707



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCRawDirectoryMonitor.java
##########
@@ -0,0 +1,90 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.sidecar.metrics.cdc.CDCReaderMonitor;
+
+/**
+ * Monitors the cdc_raw directory, cleanup unused commit logs and report metrics
+ * */
+@Singleton
+public class CDCRawDirectoryMonitor extends TimerTask
+{
+
+    private final Timer timer;
+    private final CDCReaderMonitor monitor;
+    private volatile boolean running;
+    private static final Logger logger = LoggerFactory.getLogger(CDCRawDirectoryMonitor.class);
+
+    @Inject
+    CDCRawDirectoryMonitor(CDCReaderMonitor monitor)
+    {
+        this.timer = new Timer();
+        this.monitor = monitor;
+        this.running = false;
+    }
+
+    /**
+     * Starts the background thread to monitor the cdc_raw dir.
+     * */
+    public void startMonitoring()
+    {
+        this.running = true;
+        timer.schedule(this, 0, DatabaseDescriptor.getCDCDiskCheckInterval());
+    }
+
+    @Override
+    public void run()
+    {
+        if (!this.running)
+        {
+            return;
+        }
+        // TODO : Don't be someone who just complains, do some useful work, clean files older than
+        //  the last persisted bookmark.
+        this.monitor.reportCdcRawDirectorySizeInBytes(getCdcRawDirectorySize());

Review comment:
       We don't got cdc data clean ? I saw your TODO, and Why we do not clean the cdc log that is useless after we emit the change of the log ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-sidecar] Maxwell-Guo commented on a change in pull request #18: CDC reader in Apache Cassandra Sidecar - CASSANDRASC-27

Posted by GitBox <gi...@apache.org>.
Maxwell-Guo commented on a change in pull request #18:
URL: https://github.com/apache/cassandra-sidecar/pull/18#discussion_r514057529



##########
File path: src/main/java/org/apache/cassandra/sidecar/cdc/CDCReaderService.java
##########
@@ -0,0 +1,157 @@
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.InvalidObjectException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.logging.Logger;
+import io.vertx.core.logging.LoggerFactory;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.sidecar.CQLSession;
+
+/**
+ * Cassandra's real-time change data capture service.
+ */
+@Singleton
+public class CDCReaderService implements Host.StateListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(CDCReaderService.class);
+    private final CDCIndexWatcher cdcIndexWatcher;
+    private final CDCRawDirectoryMonitor cdcRawDirectoryMonitor;
+    private final CQLSession session;
+    private final CassandraConfig cassandraConfig;
+
+    @Inject
+    public CDCReaderService(CDCIndexWatcher cdcIndexWatcher, CDCRawDirectoryMonitor monitor, CQLSession session,

Review comment:
       CDCReaderService is injected but I do not see where the object is injected ? including CDCIndexWathcher ,CDCRawDirectoryMonitor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org