You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/03/06 14:46:52 UTC

[GitHub] [camel-kafka-connector] valdar opened a new pull request #1091: Issue/969

valdar opened a new pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] lburgazzoli commented on a change in pull request #1091: Issue/969

Posted by GitBox <gi...@apache.org>.
lburgazzoli commented on a change in pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091#discussion_r589037565



##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -210,6 +255,18 @@ private long remaining(long startPollEpochMilli, long maxPollDuration)  {
         return records.isEmpty() ? null : records;
     }
 
+    @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {

Review comment:
       who is throwing InterrupedException ? we need to be sure not to left an exchange uncompleted and not tracked any more in the array

##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -177,31 +207,46 @@ private long remaining(long startPollEpochMilli, long maxPollDuration)  {
             Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();

Review comment:
       log or rethrow




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] valdar commented on pull request #1091: Issue/969

Posted by GitBox <gi...@apache.org>.
valdar commented on pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091#issuecomment-792881029


   > Need backport to camel master 
   
   backporting here: https://github.com/apache/camel-kafka-connector/pull/1094
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on pull request #1091: Issue/969

Posted by GitBox <gi...@apache.org>.
oscerd commented on pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091#issuecomment-792281892


   Need backport to camel master


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd merged pull request #1091: Issue/969

Posted by GitBox <gi...@apache.org>.
oscerd merged pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] valdar commented on a change in pull request #1091: Issue/969

Posted by GitBox <gi...@apache.org>.
valdar commented on a change in pull request #1091:
URL: https://github.com/apache/camel-kafka-connector/pull/1091#discussion_r589523811



##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -177,31 +207,46 @@ private long remaining(long startPollEpochMilli, long maxPollDuration)  {
             Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();

Review comment:
       done in https://github.com/apache/camel-kafka-connector/pull/1093

##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -210,6 +255,18 @@ private long remaining(long startPollEpochMilli, long maxPollDuration)  {
         return records.isEmpty() ? null : records;
     }
 
+    @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {

Review comment:
       done in https://github.com/apache/camel-kafka-connector/pull/1093




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org