You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/14 11:30:29 UTC
[incubator-inlong] branch master updated: [INLONG-3697][Manager] Replenish manager client examples (#3707)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c9a86615 [INLONG-3697][Manager] Replenish manager client examples (#3707)
5c9a86615 is described below
commit 5c9a86615176a7d2f96f735211577f24800be0cd
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 14 19:30:25 2022 +0800
[INLONG-3697][Manager] Replenish manager client examples (#3707)
---
...2HiveExample.java => AutoPush2HiveExample.java} | 24 ++++++++------------
.../inlong/manager/client/Binlog2KafkaExample.java | 10 ++++-----
...afka2HiveExample.java => File2HiveExample.java} | 26 +++++++++-------------
.../inlong/manager/client/Kafka2HiveExample.java | 4 ++--
4 files changed, 27 insertions(+), 37 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
similarity index 92%
copy from inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
copy to inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
index 51fccf7d5..e0857e5c0 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
@@ -17,10 +17,8 @@
package org.apache.inlong.manager.client;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
import org.apache.inlong.manager.client.api.InlongClient;
@@ -34,7 +32,7 @@ import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
-import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.AutoPushSource;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
import org.apache.shiro.util.Assert;
@@ -47,8 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-@Slf4j
-public class Kafka2HiveExample {
+public class AutoPush2HiveExample {
// Manager web url
public static String SERVICE_URL = "127.0.0.1:8083";
@@ -84,7 +81,7 @@ public class Kafka2HiveExample {
InlongStreamConf streamConf = createStreamConf();
InlongStreamBuilder streamBuilder = group.createStream(streamConf);
streamBuilder.fields(createStreamFields());
- streamBuilder.source(createKafkaSource());
+ streamBuilder.source(createAutoPushSource());
streamBuilder.sink(createHiveSink());
streamBuilder.initOrUpdate();
// start group
@@ -154,13 +151,10 @@ public class Kafka2HiveExample {
return streamConf;
}
- public KafkaSource createKafkaSource() {
- KafkaSource kafkaSource = new KafkaSource();
- kafkaSource.setBootstrapServers("{kafka.bootstrap}");
- kafkaSource.setTopic("{kafka.topic}");
- kafkaSource.setSourceName("{kafka.source.name}");
- kafkaSource.setDataFormat(DataFormat.JSON);
- return kafkaSource;
+ private AutoPushSource createAutoPushSource() {
+ AutoPushSource autoPushSource = new AutoPushSource();
+ autoPushSource.setDataProxyGroup("{Dataproxy.group}");
+ return autoPushSource;
}
private HiveSink createHiveSink() {
@@ -184,10 +178,10 @@ public class Kafka2HiveExample {
return hiveSink;
}
- public List<StreamField> createStreamFields() {
+ private List<StreamField> createStreamFields() {
List<StreamField> streamFieldList = Lists.newArrayList();
streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
return streamFieldList;
}
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
index eccb9c618..2e1af4327 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
@@ -109,7 +109,7 @@ public class Binlog2KafkaExample {
}
@Test
- public void testRestartGroup() {
+ public void testSuspendGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
@@ -120,7 +120,7 @@ public class Binlog2KafkaExample {
InlongGroupConf groupConf = createGroupConf();
try {
InlongGroup group = inlongClient.forGroup(groupConf);
- InlongGroupContext groupContext = group.restart(true);
+ InlongGroupContext groupContext = group.suspend(true);
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
@@ -128,7 +128,7 @@ public class Binlog2KafkaExample {
}
@Test
- public void testSuspendGroup() {
+ public void testRestartGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
@@ -139,7 +139,7 @@ public class Binlog2KafkaExample {
InlongGroupConf groupConf = createGroupConf();
try {
InlongGroup group = inlongClient.forGroup(groupConf);
- InlongGroupContext groupContext = group.suspend(true);
+ InlongGroupContext groupContext = group.restart(true);
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
@@ -186,7 +186,7 @@ public class Binlog2KafkaExample {
return streamConf;
}
- public MySQLBinlogSource createMysqlSource() {
+ private MySQLBinlogSource createMysqlSource() {
MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
mySQLBinlogSource.setDbNames(Arrays.asList("{db.name}"));
mySQLBinlogSource.setHostname("{db.url}");
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
similarity index 92%
copy from inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
copy to inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
index 51fccf7d5..9f3d6b9f8 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
@@ -17,10 +17,8 @@
package org.apache.inlong.manager.client;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
import org.apache.inlong.manager.client.api.InlongClient;
@@ -34,7 +32,7 @@ import org.apache.inlong.manager.client.api.SinkField;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
-import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.AgentFileSource;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
import org.apache.shiro.util.Assert;
@@ -47,8 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-@Slf4j
-public class Kafka2HiveExample {
+public class File2HiveExample {
// Manager web url
public static String SERVICE_URL = "127.0.0.1:8083";
@@ -84,7 +81,7 @@ public class Kafka2HiveExample {
InlongStreamConf streamConf = createStreamConf();
InlongStreamBuilder streamBuilder = group.createStream(streamConf);
streamBuilder.fields(createStreamFields());
- streamBuilder.source(createKafkaSource());
+ streamBuilder.source(createAgentFileSource());
streamBuilder.sink(createHiveSink());
streamBuilder.initOrUpdate();
// start group
@@ -154,13 +151,12 @@ public class Kafka2HiveExample {
return streamConf;
}
- public KafkaSource createKafkaSource() {
- KafkaSource kafkaSource = new KafkaSource();
- kafkaSource.setBootstrapServers("{kafka.bootstrap}");
- kafkaSource.setTopic("{kafka.topic}");
- kafkaSource.setSourceName("{kafka.source.name}");
- kafkaSource.setDataFormat(DataFormat.JSON);
- return kafkaSource;
+ private AgentFileSource createAgentFileSource() {
+ AgentFileSource agentFileSource = new AgentFileSource();
+ agentFileSource.setAgentIp("{agent.ip}");
+ agentFileSource.setPattern("/a/b/*.txt");
+ agentFileSource.setTimeOffset("-1h");
+ return agentFileSource;
}
private HiveSink createHiveSink() {
@@ -184,10 +180,10 @@ public class Kafka2HiveExample {
return hiveSink;
}
- public List<StreamField> createStreamFields() {
+ private List<StreamField> createStreamFields() {
List<StreamField> streamFieldList = Lists.newArrayList();
streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
return streamFieldList;
}
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
index 51fccf7d5..4bd4b3573 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
@@ -154,7 +154,7 @@ public class Kafka2HiveExample {
return streamConf;
}
- public KafkaSource createKafkaSource() {
+ private KafkaSource createKafkaSource() {
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setBootstrapServers("{kafka.bootstrap}");
kafkaSource.setTopic("{kafka.topic}");
@@ -184,7 +184,7 @@ public class Kafka2HiveExample {
return hiveSink;
}
- public List<StreamField> createStreamFields() {
+ private List<StreamField> createStreamFields() {
List<StreamField> streamFieldList = Lists.newArrayList();
streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));