You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/05 12:40:55 UTC

[GitHub] [inlong] iamsee123 opened a new pull request, #5783: [INLONG-4987] Support CloudEvents Source

iamsee123 opened a new pull request, #5783:
URL: https://github.com/apache/inlong/pull/5783

   ### Prepare a Pull Request
   
   - Fixes #4987
   
   ### Motivation
   
   CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems.
   
   ### Modifications
   
   + add an util class to convert data with cloudevent format into default message
   
   ### Verifying this change
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [x] This change added tests and can be verified as follows:
   
    - TestCloudEventsConverterUtil: to test convert util with kafka
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] haibo-duan commented on a diff in pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
haibo-duan commented on code in PR #5783:
URL: https://github.com/apache/inlong/pull/5783#discussion_r994569496


##########
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestCloudEventsConverterUtil.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.jackson.JsonFormat;
+import io.cloudevents.kafka.CloudEventDeserializer;
+import io.cloudevents.kafka.CloudEventSerializer;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.utils.CloudEventsConverterUtil;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * test cloudEvents converter util
+ */
+public class TestCloudEventsConverterUtil {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestCloudEventsConverterUtil.class);
+
+    @Before
+    public void createKafkaProducer() {
+        //Basic producer configuration
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.3:9092");

Review Comment:
   Please change the IP to 127.0.0.1, and not use a real host.



##########
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestCloudEventsConverterUtil.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.jackson.JsonFormat;
+import io.cloudevents.kafka.CloudEventDeserializer;
+import io.cloudevents.kafka.CloudEventSerializer;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.utils.CloudEventsConverterUtil;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * test cloudEvents converter util
+ */
+public class TestCloudEventsConverterUtil {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestCloudEventsConverterUtil.class);
+
+    @Before
+    public void createKafkaProducer() {
+        //Basic producer configuration
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.3:9092");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-cloudevents-producer");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
+        props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
+        props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
+
+        //create the kafkaProducer
+        KafkaProducer<String, CloudEvent> producer = new KafkaProducer<String, CloudEvent>(props);
+        String topic = "test";
+
+        CloudEventBuilder eventTemplate = CloudEventBuilder.v1()
+                .withSource(URI.create("https://github.com/inLong"))
+                .withType("producer.example");
+        for (int i = 0; i < 5; i++) {
+            try {
+                String id = UUID.randomUUID().toString();
+                String data = "Event number " + i;
+
+                CloudEvent event = eventTemplate.newBuilder()
+                        .withId(id)
+                        .withData("text/plain", data.getBytes())
+                        .build();
+
+                RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, id, event)).get();
+                LOGGER.info("Record sent to partition {} with offset {}. \n", metadata.partition(), metadata.offset());
+            } catch (Exception e) {
+                LOGGER.error("Error while trying to send the record");
+                e.printStackTrace();
+                return;
+            }
+        }
+
+        producer.flush();
+        producer.close();
+    }
+
+    /**
+     * Just using in local test
+     */
+    @Ignore
+    public void testConverter() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.3:9092");

Review Comment:
   Please change the IP to 127.0.0.1, and not use a real host.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] xuesongxs commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
xuesongxs commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1280165051

   The inlong full link must support CloudEvents protocol.
   You can join the WeChat group to discuss.
   
   
    
   From: iamsee123
   Date: 2022-10-15 22:46
   To: apache/inlong
   CC: xuesongxs; Comment
   Subject: Re: [apache/inlong] [INLONG-4987][Agent] Support CloudEvents Source (PR #5783)
   1、The implementation of this PR is a little different from my imagination. My goal is that Inlong starts an HTTP service, receives CloudEvents data, writes it to the Pulsar Topic, and the structure stored in the Topic is CloudEvents. 2、If you want to use the InlongMsg structure uniformly, use cloudevents as the body of InlongMsg inside Inlong. 3、CloudEvents can be transmitted based on HTTP, TCP and gPRC protocols.
   I got it. I will reconstruct the code later.
   —
   Reply to this email directly, view it on GitHub, or unsubscribe.
   You are receiving this because you commented.Message ID: ***@***.***>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1237617502

   Thanks your PR @iamsee123 . Is it more appropriate as an SDK?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] lucaspeng12138 commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
lucaspeng12138 commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1364830068

   [自动回复]:您的邮件我已收到,我将尽快给您回复,谢谢


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang closed pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang closed pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source
URL: https://github.com/apache/inlong/pull/5783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] iamsee123 commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
iamsee123 commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1279759390

   > 1、The implementation of this PR is a little different from my imagination. My goal is that Inlong starts an HTTP service, receives CloudEvents data, writes it to the Pulsar Topic, and the structure stored in the Topic is CloudEvents. 2、If you want to use the InlongMsg structure uniformly, use cloudevents as the body of InlongMsg inside Inlong. 3、CloudEvents can be transmitted based on HTTP, TCP and gPRC protocols.
   
   I got it. I will reconstruct the code later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] github-actions[bot] commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1495228342

   This PR is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] xuesongxs commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
xuesongxs commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1278689929

   1、The implementation of this PR is a little different from my imagination. My goal is that Inlong starts an HTTP service, receives CloudEvents data, writes it to the Pulsar Topic, and the structure stored in the Topic is CloudEvents.
   2、If you want to use the InlongMsg structure uniformly, use cloudevents as the body of InlongMsg inside Inlong.
   3、CloudEvents can be transmitted based on HTTP, TCP and gPRC protocols.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] iamsee123 commented on pull request #5783: [INLONG-4987][Agent] Support CloudEvents Source

Posted by GitBox <gi...@apache.org>.
iamsee123 commented on PR #5783:
URL: https://github.com/apache/inlong/pull/5783#issuecomment-1245216066

   > Thanks your PR @iamsee123 . Is it more appropriate as an SDK?
   
   I think using util class is enough, because it just use to convert data from cloudEvent format into defaultMessage format.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org