You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/07 19:56:13 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #2604: KAFKA-4794: Add access to OffsetStorageReader from SourceConnector

rhauch commented on a change in pull request #2604:
URL: https://github.com/apache/kafka/pull/2604#discussion_r421741455



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -58,41 +61,40 @@
 
     private Map<String, String> config;
     private State state;
-
-    public WorkerConnector(String connName,
-                           Connector connector,
-                           ConnectorContext ctx,
-                           ConnectMetrics metrics,
-                           ConnectorStatus.Listener statusListener) {
+    private final OffsetStorageReader offsetStorageReader;
+
+    public WorkerConnector(final String connName,
+                           final Connector connector,
+                           final ConnectorContext ctx,
+                           final ConnectMetrics metrics,
+                           final ConnectorStatus.Listener statusListener,
+                           final OffsetStorageReader offsetStorageReader) {
         this.connName = connName;
         this.ctx = ctx;
         this.connector = connector;
         this.state = State.INIT;
         this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
         this.statusListener = this.metrics;
+        this.offsetStorageReader = offsetStorageReader;
     }
 
     public void initialize(ConnectorConfig connectorConfig) {
         try {
             this.config = connectorConfig.originalsStrings();
+
             log.debug("{} Initializing connector {}", this, connName);
             if (isSinkConnector()) {
                 SinkConnectorConfig.validate(config);
             }
 
-            connector.initialize(new ConnectorContext() {
-                @Override
-                public void requestTaskReconfiguration() {
-                    ctx.requestTaskReconfiguration();
-                }
-
-                @Override
-                public void raiseError(Exception e) {
-                    log.error("{} Connector raised an error", WorkerConnector.this, e);
-                    onFailure(e);
-                    ctx.raiseError(e);
-                }
-            });
+            final ConnectorContext delegateCtx = new DelegateToWorkerConnectorContext();
+
+            if (isSinkConnector()) {
+                SinkConnectorConfig.validate(config);
+                connector.initialize(new DelegateSinkConnectorContext(delegateCtx));
+            } else {
+                connector.initialize(new DelegateSourceConnectorContext(delegateCtx, offsetStorageReader));
+            }

Review comment:
       At one point these were anonymous classes, and the delegation perhaps made a bit more sense. Now that they are inner classes, it seems like it would be a lot less confusing and simpler if the `DelegateToWorkerConnectorContext` class were extended by the `DelegateSinkConnectorContext` and `DelegateSourceConnectorContext` classes. Doing this has several advantages:
   1. removes nesting/delegation and eliminates the chance of getting into an infinite loop
   2. eliminates duplicate code in these classes
   3. simplifies the context classes quite a bit
   4. makes it more obvious that the sink connector context has nothing extra over the base
   5. makes it more obvious that the source connector context only adds the offset storage reader
   6. simplifies this code a bit (see below)
   
   By keeping `DelegateToWorkerConnectorContext` class non-abstract, we're actually able to maintain backward compatibility by calling initialize when the connector class is neither a source or sink connector:
   
   This code becomes simpler, too:
   ```suggestion
               if (isSinkConnector()) {
                   SinkConnectorConfig.validate(config);
                   connector.initialize(new DelegateSinkConnectorContext());
               } else if (isSourceConnector()) {
                   connector.initialize(new DelegateSourceConnectorContext(offsetStorageReader));
               } else {
                   connector.initialize(new DelegateToWorkerConnectorContext());
               }
   ```
   This seems a little strange, but we're actually not checking whether the `Connector` instance passed into this `WorkerConnector` is only an instance of `SourceConnector` or `SinkConnector`. If it is neither, prior to this change the framework would still initialize it, and I think we should maintain that arguably strange behavior just to maintain backward compatibility.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -239,6 +240,7 @@ public boolean startConnector(
             ConnectorStatus.Listener statusListener,
             TargetState initialState
     ) {
+

Review comment:
       Nit: let's avoid unrelated line additions.

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
##########
@@ -34,4 +34,12 @@
      */
     public static final String TOPICS_CONFIG = "topics";
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected SinkConnectorContext context() {

Review comment:
       Right now, this method is unused, which means we have no tests for this. The existing `MockConnector` class uses the `context` field from `Connector`, and at a minimum I think it be worthwhile to consider changing that to call the `context()` method. This class is wrapped by `MockSinkConnector` and `MockSourceConnector`, and used in unit tests.
   
   However, it's probably worth adding a few new unit test classes for `SinkConnector` and `SourceConnector` so that we can verify things like `context()` returns the same instance as supplied to `initialize(...)`, that the `context` field also has the same instance as supplied to `initialize(...)`, and that the casting in `SinkConnector.context()` and `SourceConnector.context()` is actually safe.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -58,41 +61,40 @@
 
     private Map<String, String> config;
     private State state;
-
-    public WorkerConnector(String connName,
-                           Connector connector,
-                           ConnectorContext ctx,
-                           ConnectMetrics metrics,
-                           ConnectorStatus.Listener statusListener) {
+    private final OffsetStorageReader offsetStorageReader;
+
+    public WorkerConnector(final String connName,
+                           final Connector connector,
+                           final ConnectorContext ctx,
+                           final ConnectMetrics metrics,
+                           final ConnectorStatus.Listener statusListener,
+                           final OffsetStorageReader offsetStorageReader) {
         this.connName = connName;
         this.ctx = ctx;
         this.connector = connector;
         this.state = State.INIT;
         this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
         this.statusListener = this.metrics;
+        this.offsetStorageReader = offsetStorageReader;
     }
 
     public void initialize(ConnectorConfig connectorConfig) {
         try {
             this.config = connectorConfig.originalsStrings();
+

Review comment:
       Nit: let's avoid unrelated line additions.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
##########
@@ -85,7 +87,7 @@ public void testInitializeFailure() {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);

Review comment:
       So this existing test is a little strange because it does not instantiate a source or sink connector but merely relies upon the WorkerConnector for the most part has the same behavior regardless of whether the `Connector` instance is an instance of `SourceConnector` or `SinkConnector`. This PR changes this behavior, and I think we need to add a bit more testing here.
   
   I'd suggest adding three more tests that simply verify the correct context type is passed to the `connector.initialize(...)` method, and that these tests verify the context is:
   * a `ConnectorContext` instance if the connector is neither a source or a sink
   * a `SinkConnectorContext` instance if the connector is a sink
   * a `SourceConnectorContext` instance if the connector is a source




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org