You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lukecwik (via GitHub)" <gi...@apache.org> on 2023/02/02 17:15:26 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #24713: Caching DebeziumIO SourceTask instances to reduce the number of outstanding connections

lukecwik commented on code in PR #24713:
URL: https://github.com/apache/beam/pull/24713#discussion_r1094806645


##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, Map<String, Object>> tracker) {
+    final KafkaSourceConsumerFn<T> dofnInstance = this;
+    try {
+      return CONNECTOR_CACHE.get(
+          element,
+          new Callable<SourceTask>() {
+            @Override
+            public SourceTask call() throws Exception {
+              SourceConnector connector = null;
+              SourceTask innerTask;
+              Map<String, String> configuration = new HashMap<>(element);
+              configuration.put(BEAM_INSTANCE_PROPERTY, dofnInstance.getHashCode());
+              try {
+                connector = connectorClass.getDeclaredConstructor().newInstance();
+                connector.start(configuration);
+                innerTask =
+                    (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance();
+              } catch (InstantiationException
+                  | IllegalAccessException
+                  | InvocationTargetException
+                  | NoSuchMethodException e) {
+                throw new RuntimeException(
+                    "Unable to initialize connector instance for Debezium", e);
+              }
+              Map<String, ?> consumerOffset = tracker.currentRestriction().offset;
+              LOG.debug("--------- Created new Debezium task with offset: {}", consumerOffset);
+
+              innerTask.initialize(new BeamSourceTaskContext(tracker.currentRestriction().offset));

Review Comment:
   ```suggestion
                 innerTask.initialize(new BeamSourceTaskContext(consumerOffset));
   ```



##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, Map<String, Object>> tracker) {
+    final KafkaSourceConsumerFn<T> dofnInstance = this;
+    try {

Review Comment:
   If we have a value in the cache, do we need to update the consumer offset on what is being returned (we currently only do it on the first creation)?



##########
sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java:
##########
@@ -202,6 +209,63 @@ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
     return timestamp;
   }
 
+  private static final Cache<Map<String, String>, SourceTask> CONNECTOR_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(java.time.Duration.ofSeconds(60))
+          .removalListener(
+              new RemovalListener<Map<String, String>, SourceTask>() {
+                @Override
+                public void onRemoval(
+                    RemovalNotification<Map<String, String>, SourceTask> removalNotification) {
+                  LOG.debug(
+                      "Task for key [[{}]] is being removed. Cause: {}",
+                      removalNotification.getKey(),
+                      removalNotification.getCause());
+                  removalNotification.getValue().stop();
+                }
+              })
+          .maximumSize(10)
+          .build();
+
+  SourceTask getDebeziumSourceTask(
+      Map<String, String> element, RestrictionTracker<OffsetHolder, Map<String, Object>> tracker) {

Review Comment:
   Can the source task be used in parallel if there are multiple elements with the same configuration but different restrictions (since multiple `element + restriction` pairs can be executed in parallel while the map is keyed by the `element`)?
   
   If no, then you want to build an object pool and not an object cache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org