You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/08 04:00:21 UTC
[flume] 08/14: Spring Boot support
This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
commit 1521f5c32e73b01faafec214a628bbf477deb8a4
Author: rgoers <ra...@dslextreme.com>
AuthorDate: Thu Sep 22 12:00:48 2022 -0700
Spring Boot support
---
flume-spring-boot/pom.xml | 94 ++++++++++++++++++
.../org/apache/flume/spring/boot/Application.java | 33 +++++++
.../boot/config/AbstractFlumeConfiguration.java | 107 +++++++++++++++++++++
.../spring/boot/config/SpringConfiguration.java | 59 ++++++++++++
.../flume/spring/boot/runner/SpringFlume.java | 59 ++++++++++++
.../java/org/apache/flume/spring/boot/AppTest.java | 43 +++++++++
.../apache/flume/spring/boot/config/AppConfig.java | 75 +++++++++++++++
.../src/test/resources/application.yml | 8 ++
.../src/test/resources/log4j2-test.xml | 31 ++++++
pom.xml | 3 +-
10 files changed, 511 insertions(+), 1 deletion(-)
diff --git a/flume-spring-boot/pom.xml b/flume-spring-boot/pom.xml
new file mode 100644
index 000000000..bfd058fcc
--- /dev/null
+++ b/flume-spring-boot/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flume-parent</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flume-spring-boot</artifactId>
+ <name>Flume Spring Boot Support</name>
+
+ <properties>
+ <module.name>org.apache.flume.spring.boot</module.name>
+ <spring-boot.version>2.7.3</spring-boot.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring-boot.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ </dependency>
+ <!-- Spring Boot dependencies -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- log dependencies -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-log4j2</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java
new file mode 100644
index 000000000..4edbad5f5
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.flume.spring.boot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ *
+ */
+@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"})
+@EnableConfigurationProperties
+public class Application {
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java
new file mode 100644
index 000000000..0846e46dd
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurables;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractFlumeConfiguration {
+
+ protected <T extends Channel> T configureChannel(final String name, final Class<T> clazz,
+ final Map<String, String> params) {
+ T channel;
+ try {
+ channel = clazz.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to create channel " + name, ex);
+ }
+ channel.setName(name);
+ Configurables.configure(channel, createContext(params));
+ return channel;
+ }
+
+ protected <T extends Source> SourceRunner configureSource(final String name, final Class<T> clazz,
+ final ChannelSelector selector, final Map<String, String> params) {
+ T source;
+ try {
+ source = clazz.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to create source " + name, ex);
+ }
+ source.setName(name);
+ Configurables.configure(source, createContext(params));
+ source.setChannelProcessor(new ChannelProcessor(selector));
+ return SourceRunner.forSource(source);
+ }
+
+ protected <T extends Sink> Sink configureSink(final String name, final Class<T> clazz,
+ final Channel channel,
+ final Map<String, String> params) {
+ T sink;
+ try {
+ sink = clazz.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to create sink " + name, ex);
+ }
+ sink.setName(name);
+ Configurables.configure(sink, createContext(params));
+ sink.setChannel(channel);
+ return sink;
+ }
+
+ protected ChannelSelector createChannelSelector(Class<? extends ChannelSelector> clazz,
+ Map<String, String> params) {
+ ChannelSelector selector;
+ try {
+ selector = clazz.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to create channel selector " + clazz.getName(), ex);
+ }
+ Configurables.configure(selector, createContext(params));
+ return selector;
+ }
+
+ /**
+ * Creates a List from a Varargs array.
+ *
+ * @param items The items to add to the list.
+ * @param <T> The type of objects in the List.
+ * @return a List containing the supplied items.
+ */
+ @SafeVarargs
+ protected final <T> List<T> listOf(T... items) {
+ return Arrays.asList(items);
+ }
+
+ private static Context createContext(Map<String, String> map) {
+ return map != null ? new Context(map) : new Context();
+ }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java
new file mode 100644
index 000000000..eac9a765a
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.flume.node.SimpleMaterializedConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+/**
+ *
+ */
+@Configuration
+public class SpringConfiguration {
+
+ @Autowired
+ public Map<String, Channel> channels;
+
+ @Autowired
+ public Map<String, SinkRunner> sinkRunners;
+
+ @Autowired
+ public Map<String, SourceRunner> sourceRunners;
+
+ @Bean
+ public MaterializedConfiguration configuration() {
+ MaterializedConfiguration config = new SimpleMaterializedConfiguration();
+ for (Map.Entry<String, Channel> entry : channels.entrySet()) {
+ config.addChannel(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, SinkRunner> entry : sinkRunners.entrySet()) {
+ config.addSinkRunner(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, SourceRunner> entry : sourceRunners.entrySet()) {
+ config.addSourceRunner(entry.getKey(), entry.getValue());
+ }
+ return config;
+ }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java
new file mode 100644
index 000000000..2ddecad29
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.runner;
+
+import com.google.common.collect.Lists;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.node.Application;
+import org.apache.flume.node.MaterializedConfiguration;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.List;
+
+/**
+ *
+ */
+@Component
+public class SpringFlume {
+ private MaterializedConfiguration materializedConfiguration;
+
+ private final Application application;
+
+ @Autowired
+ public SpringFlume(MaterializedConfiguration configuration) {
+ this.materializedConfiguration = configuration;
+ List<LifecycleAware> components = Lists.newArrayList();
+ application = new Application(components);
+ }
+
+ @PostConstruct
+ public void startUp() {
+ application.start();
+ application.handleConfigurationEvent(materializedConfiguration);
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ application.stop();
+ }
+
+
+}
diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java
new file mode 100644
index 000000000..af2654348
--- /dev/null
+++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot;
+
+import org.apache.flume.node.MaterializedConfiguration;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+/**
+ *
+ */
+@ExtendWith(SpringExtension.class)
+@SpringBootTest
+public class AppTest {
+
+ @Autowired
+ MaterializedConfiguration configuration;
+
+ @Test
+ public void contextLoads() {
+ Assertions.assertThat(configuration).isNotNull();
+ Assertions.assertThat(configuration.getSinkRunners()).isNotNull();
+ Assertions.assertThat(configuration.getSinkRunners().size()).isEqualTo(1);
+ }
+}
diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java
new file mode 100644
index 000000000..cc9534376
--- /dev/null
+++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+@Configuration
+public class AppConfig extends AbstractFlumeConfiguration {
+
+ @Bean
+ @ConfigurationProperties(prefix = "flume.sources.source1")
+ public Map<String, String> source1Properties() {
+ return new HashMap<>();
+ }
+
+ @Bean
+ @ConfigurationProperties(prefix = "flume.channels.channel1")
+ public Map<String, String> channel1Properties() {
+ return new HashMap<>();
+ }
+
+ @Bean
+ public Channel memoryChannel(Map<String, String> channel1Properties) {
+ return configureChannel("channel1", MemoryChannel.class, channel1Properties);
+ }
+
+ @Bean
+ public SourceRunner seqSource(Channel memoryChannel, Map<String, String> source1Properties) {
+ ChannelSelector selector = new ReplicatingChannelSelector();
+ selector.setChannels(listOf(memoryChannel));
+ return configureSource("source1", SequenceGeneratorSource.class, selector,
+ source1Properties);
+ }
+
+ @Bean
+ public SinkRunner nullSink(Channel memoryChannel) {
+ SinkProcessor sinkProcessor = new DefaultSinkProcessor();
+ Sink sink = configureSink("null", NullSink.class, memoryChannel,null);
+ sinkProcessor.setSinks(listOf(sink));
+ return new SinkRunner(sinkProcessor);
+ }
+}
diff --git a/flume-spring-boot/src/test/resources/application.yml b/flume-spring-boot/src/test/resources/application.yml
new file mode 100644
index 000000000..8e2e2e5ed
--- /dev/null
+++ b/flume-spring-boot/src/test/resources/application.yml
@@ -0,0 +1,8 @@
+
+flume:
+ sources:
+ source1:
+ totalEvents: 10000
+ channels:
+ channel1:
+ capacity: 10000
diff --git a/flume-spring-boot/src/test/resources/log4j2-test.xml b/flume-spring-boot/src/test/resources/log4j2-test.xml
new file mode 100644
index 000000000..d912f3105
--- /dev/null
+++ b/flume-spring-boot/src/test/resources/log4j2-test.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="ERROR">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d [%t] %-5level: %msg%n%throwable" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.flume" level="INFO" />
+ <Root level="WARN">
+ <AppenderRef ref="Console" />
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f81743465..416035d2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
<fasterxml.jackson.databind.version>2.13.2.1</fasterxml.jackson.databind.version>
<fest-reflect.version>1.4</fest-reflect.version>
<geronimo-jms.version>1.1.1</geronimo-jms.version>
- <gson.version>2.2.2</gson.version>
+ <gson.version>2.9.1</gson.version>
<guava.version>18.0</guava.version>
<guava-old.version>11.0.2</guava-old.version>
<hadoop.version>2.10.1</hadoop.version>
@@ -149,6 +149,7 @@ limitations under the License.
<module>flume-shared</module>
<module>flume-ng-configfilters</module>
<module>build-support</module>
+ <module>flume-spring-boot</module>
</modules>
<profiles>