You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/11 03:39:34 UTC

[kafka] branch 2.6 updated (1665519 -> df2ab9f)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a change to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 1665519  KAFKA-7833: Add missing test (#8847)
     new eb58b61  KAFKA-9066: Retain metrics for failed tasks (#8502)
     new df2ab9f  KAFKA-9845: Warn users about using config providers with plugin.path property (#8455)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kafka/connect/runtime/Worker.java   |  7 +++++-
 .../apache/kafka/connect/runtime/WorkerConfig.java | 24 ++++++++++++++++++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 +++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++--
 .../apache/kafka/connect/runtime/WorkerTask.java   | 27 ++++++++--------------
 .../kafka/connect/runtime/WorkerTaskTest.java      |  9 --------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 12 ++++++++++
 .../runtime/WorkerWithTopicCreationTest.java       | 12 ++++++++++
 8 files changed, 75 insertions(+), 32 deletions(-)


[kafka] 02/02: KAFKA-9845: Warn users about using config providers with plugin.path property (#8455)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit df2ab9f4c9e44187455366848522cb135e6ab6e8
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jun 10 20:06:38 2020 -0700

    KAFKA-9845: Warn users about using config providers with plugin.path property (#8455)
    
    * KAFKA-9845: Fix plugin.path when config provider is used
    
    * Revert "KAFKA-9845: Fix plugin.path when config provider is used"
    
    This reverts commit 96caaa9a4934bcef78d7b145d18aa1718cb10009.
    
    * KAFKA-9845: Emit ERROR-level log message when config provider is used for plugin.path property
    
    * KAFKA-9845: Demote log message level from ERROR to WARN
    
    Co-Authored-By: Nigel Liang <ni...@nigelliang.com>
    
    * KAFKA-94845: Fix failing unit tests
    
    * KAFKA-9845: Add warning message to docstring for plugin.path config
    
    * KAFKA-9845: Apply suggestions from code review
    
    Co-authored-by: Randall Hauch <rh...@gmail.com>
    
    Co-authored-by: Nigel Liang <ni...@nigelliang.com>
    Co-authored-by: Randall Hauch <rh...@gmail.com>
---
 .../apache/kafka/connect/runtime/WorkerConfig.java | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 1b31a5f..a137b2d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 import org.eclipse.jetty.util.StringUtil;
@@ -212,7 +213,10 @@ public class WorkerConfig extends AbstractConfig {
             + "plugins and their dependencies\n"
             + "Note: symlinks will be followed to discover dependencies or plugins.\n"
             + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
-            + "/opt/connectors";
+            + "/opt/connectors\n" 
+            + "Do not use config provider variables in this property, since the raw path is used "
+            + "by the worker's scanner before config providers are initialized and used to "
+            + "replace variables.";
 
     public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
     protected static final String CONFIG_PROVIDERS_DOC =
@@ -407,6 +411,23 @@ public class WorkerConfig extends AbstractConfig {
         }
     }
 
+    private void logPluginPathConfigProviderWarning(Map<String, String> rawOriginals) {
+        String rawPluginPath = rawOriginals.get(PLUGIN_PATH_CONFIG);
+        // Can't use AbstractConfig::originalsStrings here since some values may be null, which
+        // causes that method to fail
+        String transformedPluginPath = Objects.toString(originals().get(PLUGIN_PATH_CONFIG));
+        if (!Objects.equals(rawPluginPath, transformedPluginPath)) {
+            log.warn(
+                "Variables cannot be used in the 'plugin.path' property, since the property is "
+                + "used by plugin scanning before the config providers that replace the " 
+                + "variables are initialized. The raw value '{}' was used for plugin scanning, as " 
+                + "opposed to the transformed value '{}', and this may cause unexpected results.",
+                rawPluginPath,
+                transformedPluginPath
+            );
+        }
+    }
+
     public Integer getRebalanceTimeout() {
         return null;
     }
@@ -430,6 +451,7 @@ public class WorkerConfig extends AbstractConfig {
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         super(definition, props);
         logInternalConverterDeprecationWarnings(props);
+        logPluginPathConfigProviderWarning(props);
     }
 
     // Visible for testing


[kafka] 01/02: KAFKA-9066: Retain metrics for failed tasks (#8502)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit eb58b61603e8bdb0ccf94af3f956a09b5591d5b3
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Jun 10 20:03:25 2020 -0700

    KAFKA-9066: Retain metrics for failed tasks (#8502)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  7 +++++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 +++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++--
 .../apache/kafka/connect/runtime/WorkerTask.java   | 27 ++++++++--------------
 .../kafka/connect/runtime/WorkerTaskTest.java      |  9 --------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 12 ++++++++++
 .../runtime/WorkerWithTopicCreationTest.java       | 12 ++++++++++
 7 files changed, 52 insertions(+), 31 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 26b0444..67a27e1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -789,13 +789,18 @@ public class Worker {
                 return;
             }
 
-            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (!task.awaitStop(timeout)) {
                 log.error("Graceful stop of task {} failed.", task.id());
                 task.cancel();
             } else {
                 log.debug("Graceful stop of task {} succeeded.", task.id());
             }
+
+            try {
+                task.removeMetrics();
+            } finally {
+                connectorStatusMetricsGroup.recordTaskRemoved(taskId);
+            }
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c22ce4a..50efffc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -174,8 +174,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sinkTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sinkTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e44d93a..1febd7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -188,8 +188,12 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
-    protected void releaseResources() {
-        sourceTaskMetricsGroup.close();
+    public void removeMetrics() {
+        try {
+            sourceTaskMetricsGroup.close();
+        } finally {
+            super.removeMetrics();
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b0a6a0c..11b0746 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
+    /**
+     * Remove all metrics published by this task.
+     */
+    public void removeMetrics() {
+        taskMetricsGroup.close();
+    }
+
     protected abstract void execute();
 
     protected abstract void close();
 
-    /**
-     * Method called when this worker task has been completely closed, and when the subclass should clean up
-     * all resources.
-     */
-    protected abstract void releaseResources();
-
     protected boolean isStopping() {
         return stopping;
     }
@@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable {
                 if (t instanceof Error)
                     throw (Error) t;
             } finally {
-                try {
-                    Thread.currentThread().setName(savedName);
-                    Plugins.compareAndSwapLoaders(savedLoader);
-                    shutdownLatch.countDown();
-                } finally {
-                    try {
-                        releaseResources();
-                    } finally {
-                        taskMetricsGroup.close();
-                    }
-                }
+                Thread.currentThread().setName(savedName);
+                Plugins.compareAndSwapLoaders(savedLoader);
+                shutdownLatch.countDown();
             }
         }
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 44c45d5..c26a88c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -109,9 +109,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         statusListener.onShutdown(taskId);
         expectLastCall();
 
@@ -153,9 +150,6 @@ public class WorkerTaskTest {
         workerTask.close();
         EasyMock.expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         replay(workerTask);
 
         workerTask.initialize(TASK_CONFIG);
@@ -219,9 +213,6 @@ public class WorkerTaskTest {
         workerTask.close();
         expectLastCall();
 
-        workerTask.releaseResources();
-        EasyMock.expectLastCall();
-
         // there should be no call to onShutdown()
 
         replay(workerTask);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 48ab58f..5177acf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -602,6 +602,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -677,6 +680,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -890,6 +896,9 @@ public class WorkerTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -964,6 +973,9 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index 8b54033..f698a30 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -594,6 +594,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -669,6 +672,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         // Each time we check the task metrics, the worker will call the herder
         herder.taskStatus(TASK_ID);
         EasyMock.expectLastCall()
@@ -879,6 +885,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();
 
@@ -953,6 +962,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
+        workerTask.removeMetrics();
+        EasyMock.expectLastCall();
+
         expectStopStorage();
         expectClusterId();