You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/14 11:30:29 UTC

[incubator-inlong] branch master updated: [INLONG-3697][Manager] Replenish manager client examples (#3707)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9a86615 [INLONG-3697][Manager] Replenish manager client examples (#3707)
5c9a86615 is described below

commit 5c9a86615176a7d2f96f735211577f24800be0cd
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 14 19:30:25 2022 +0800

    [INLONG-3697][Manager] Replenish manager client examples (#3707)
---
 ...2HiveExample.java => AutoPush2HiveExample.java} | 24 ++++++++------------
 .../inlong/manager/client/Binlog2KafkaExample.java | 10 ++++-----
 ...afka2HiveExample.java => File2HiveExample.java} | 26 +++++++++-------------
 .../inlong/manager/client/Kafka2HiveExample.java   |  4 ++--
 4 files changed, 27 insertions(+), 37 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
similarity index 92%
copy from inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
copy to inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
index 51fccf7d5..e0857e5c0 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
@@ -17,10 +17,8 @@
 
 package org.apache.inlong.manager.client;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
 import org.apache.inlong.manager.client.api.InlongClient;
@@ -34,7 +32,7 @@ import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
-import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.AutoPushSource;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
 import org.apache.shiro.util.Assert;
@@ -47,8 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-@Slf4j
-public class Kafka2HiveExample {
+public class AutoPush2HiveExample {
 
     // Manager web url
     public static String SERVICE_URL = "127.0.0.1:8083";
@@ -84,7 +81,7 @@ public class Kafka2HiveExample {
             InlongStreamConf streamConf = createStreamConf();
             InlongStreamBuilder streamBuilder = group.createStream(streamConf);
             streamBuilder.fields(createStreamFields());
-            streamBuilder.source(createKafkaSource());
+            streamBuilder.source(createAutoPushSource());
             streamBuilder.sink(createHiveSink());
             streamBuilder.initOrUpdate();
             // start group
@@ -154,13 +151,10 @@ public class Kafka2HiveExample {
         return streamConf;
     }
 
-    public KafkaSource createKafkaSource() {
-        KafkaSource kafkaSource = new KafkaSource();
-        kafkaSource.setBootstrapServers("{kafka.bootstrap}");
-        kafkaSource.setTopic("{kafka.topic}");
-        kafkaSource.setSourceName("{kafka.source.name}");
-        kafkaSource.setDataFormat(DataFormat.JSON);
-        return kafkaSource;
+    private AutoPushSource createAutoPushSource() {
+        AutoPushSource autoPushSource = new AutoPushSource();
+        autoPushSource.setDataProxyGroup("{Dataproxy.group}");
+        return autoPushSource;
     }
 
     private HiveSink createHiveSink() {
@@ -184,10 +178,10 @@ public class Kafka2HiveExample {
         return hiveSink;
     }
 
-    public List<StreamField> createStreamFields() {
+    private List<StreamField> createStreamFields() {
         List<StreamField> streamFieldList = Lists.newArrayList();
         streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
         streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
         return streamFieldList;
     }
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
index eccb9c618..2e1af4327 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
@@ -109,7 +109,7 @@ public class Binlog2KafkaExample {
     }
 
     @Test
-    public void testRestartGroup() {
+    public void testSuspendGroup() {
         ClientConfiguration configuration = new ClientConfiguration();
         configuration.setWriteTimeout(10);
         configuration.setReadTimeout(10);
@@ -120,7 +120,7 @@ public class Binlog2KafkaExample {
         InlongGroupConf groupConf = createGroupConf();
         try {
             InlongGroup group = inlongClient.forGroup(groupConf);
-            InlongGroupContext groupContext = group.restart(true);
+            InlongGroupContext groupContext = group.suspend(true);
             Assert.notNull(groupContext);
         } catch (Exception e) {
             e.printStackTrace();
@@ -128,7 +128,7 @@ public class Binlog2KafkaExample {
     }
 
     @Test
-    public void testSuspendGroup() {
+    public void testRestartGroup() {
         ClientConfiguration configuration = new ClientConfiguration();
         configuration.setWriteTimeout(10);
         configuration.setReadTimeout(10);
@@ -139,7 +139,7 @@ public class Binlog2KafkaExample {
         InlongGroupConf groupConf = createGroupConf();
         try {
             InlongGroup group = inlongClient.forGroup(groupConf);
-            InlongGroupContext groupContext = group.suspend(true);
+            InlongGroupContext groupContext = group.restart(true);
             Assert.notNull(groupContext);
         } catch (Exception e) {
             e.printStackTrace();
@@ -186,7 +186,7 @@ public class Binlog2KafkaExample {
         return streamConf;
     }
 
-    public MySQLBinlogSource createMysqlSource() {
+    private MySQLBinlogSource createMysqlSource() {
         MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
         mySQLBinlogSource.setDbNames(Arrays.asList("{db.name}"));
         mySQLBinlogSource.setHostname("{db.url}");
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
similarity index 92%
copy from inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
copy to inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
index 51fccf7d5..9f3d6b9f8 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
@@ -17,10 +17,8 @@
 
 package org.apache.inlong.manager.client;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
 import org.apache.inlong.manager.client.api.InlongClient;
@@ -34,7 +32,7 @@ import org.apache.inlong.manager.client.api.SinkField;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
-import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.AgentFileSource;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
 import org.apache.shiro.util.Assert;
@@ -47,8 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-@Slf4j
-public class Kafka2HiveExample {
+public class File2HiveExample {
 
     // Manager web url
     public static String SERVICE_URL = "127.0.0.1:8083";
@@ -84,7 +81,7 @@ public class Kafka2HiveExample {
             InlongStreamConf streamConf = createStreamConf();
             InlongStreamBuilder streamBuilder = group.createStream(streamConf);
             streamBuilder.fields(createStreamFields());
-            streamBuilder.source(createKafkaSource());
+            streamBuilder.source(createAgentFileSource());
             streamBuilder.sink(createHiveSink());
             streamBuilder.initOrUpdate();
             // start group
@@ -154,13 +151,12 @@ public class Kafka2HiveExample {
         return streamConf;
     }
 
-    public KafkaSource createKafkaSource() {
-        KafkaSource kafkaSource = new KafkaSource();
-        kafkaSource.setBootstrapServers("{kafka.bootstrap}");
-        kafkaSource.setTopic("{kafka.topic}");
-        kafkaSource.setSourceName("{kafka.source.name}");
-        kafkaSource.setDataFormat(DataFormat.JSON);
-        return kafkaSource;
+    private AgentFileSource createAgentFileSource() {
+        AgentFileSource agentFileSource = new AgentFileSource();
+        agentFileSource.setAgentIp("{agent.ip}");
+        agentFileSource.setPattern("/a/b/*.txt");
+        agentFileSource.setTimeOffset("-1h");
+        return agentFileSource;
     }
 
     private HiveSink createHiveSink() {
@@ -184,10 +180,10 @@ public class Kafka2HiveExample {
         return hiveSink;
     }
 
-    public List<StreamField> createStreamFields() {
+    private List<StreamField> createStreamFields() {
         List<StreamField> streamFieldList = Lists.newArrayList();
         streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
         streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
         return streamFieldList;
     }
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
index 51fccf7d5..4bd4b3573 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
@@ -154,7 +154,7 @@ public class Kafka2HiveExample {
         return streamConf;
     }
 
-    public KafkaSource createKafkaSource() {
+    private KafkaSource createKafkaSource() {
         KafkaSource kafkaSource = new KafkaSource();
         kafkaSource.setBootstrapServers("{kafka.bootstrap}");
         kafkaSource.setTopic("{kafka.topic}");
@@ -184,7 +184,7 @@ public class Kafka2HiveExample {
         return hiveSink;
     }
 
-    public List<StreamField> createStreamFields() {
+    private List<StreamField> createStreamFields() {
         List<StreamField> streamFieldList = Lists.newArrayList();
         streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
         streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));