You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/10/03 16:30:59 UTC

[flume] 03/04: Spring Boot support

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch flume-spring-boot
in repository https://gitbox.apache.org/repos/asf/flume.git

commit bbae4b8b7775bbc2051e6e2f8b5e9b1b9c13ea06
Author: rgoers <ra...@dslextreme.com>
AuthorDate: Thu Sep 22 12:00:48 2022 -0700

    Spring Boot support
---
 flume-spring-boot/pom.xml                          |  94 ++++++++++++++++++
 .../org/apache/flume/spring/boot/Application.java  |  33 +++++++
 .../boot/config/AbstractFlumeConfiguration.java    | 107 +++++++++++++++++++++
 .../spring/boot/config/SpringConfiguration.java    |  59 ++++++++++++
 .../flume/spring/boot/runner/SpringFlume.java      |  59 ++++++++++++
 .../java/org/apache/flume/spring/boot/AppTest.java |  43 +++++++++
 .../apache/flume/spring/boot/config/AppConfig.java |  75 +++++++++++++++
 .../src/test/resources/application.yml             |   8 ++
 .../src/test/resources/log4j2-test.xml             |  31 ++++++
 pom.xml                                            |   3 +-
 10 files changed, 511 insertions(+), 1 deletion(-)

diff --git a/flume-spring-boot/pom.xml b/flume-spring-boot/pom.xml
new file mode 100644
index 000000000..bfd058fcc
--- /dev/null
+++ b/flume-spring-boot/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.11.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>flume-spring-boot</artifactId>
+  <name>Flume Spring Boot Support</name>
+
+  <properties>
+    <module.name>org.apache.flume.spring.boot</module.name>
+    <spring-boot.version>2.7.3</spring-boot.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-dependencies</artifactId>
+        <version>${spring-boot.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-node</artifactId>
+    </dependency>
+    <!-- Spring Boot dependencies -->
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework.boot</groupId>
+          <artifactId>spring-boot-starter-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- log dependencies -->
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-log4j2</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+
+  </build>
+
+</project>
\ No newline at end of file
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java
new file mode 100644
index 000000000..4edbad5f5
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+
+package org.apache.flume.spring.boot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ *
+ */
+@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"})
+@EnableConfigurationProperties
+public class Application {
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class, args);
+  }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java
new file mode 100644
index 000000000..0846e46dd
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurables;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractFlumeConfiguration {
+
+  protected <T extends Channel> T configureChannel(final String name, final Class<T> clazz,
+                                                   final Map<String, String> params) {
+    T channel;
+    try {
+      channel = clazz.newInstance();
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to create channel " + name, ex);
+    }
+    channel.setName(name);
+    Configurables.configure(channel, createContext(params));
+    return channel;
+  }
+
+  protected <T extends Source> SourceRunner configureSource(final String name, final Class<T> clazz,
+      final ChannelSelector selector, final Map<String, String> params) {
+    T source;
+    try {
+      source = clazz.newInstance();
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to create source " + name, ex);
+    }
+    source.setName(name);
+    Configurables.configure(source, createContext(params));
+    source.setChannelProcessor(new ChannelProcessor(selector));
+    return SourceRunner.forSource(source);
+  }
+
+  protected <T extends Sink> Sink configureSink(final String name, final Class<T> clazz,
+                                                final Channel channel,
+                                                final Map<String, String> params) {
+    T sink;
+    try {
+      sink = clazz.newInstance();
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to create sink " + name, ex);
+    }
+    sink.setName(name);
+    Configurables.configure(sink, createContext(params));
+    sink.setChannel(channel);
+    return sink;
+  }
+
+  protected ChannelSelector createChannelSelector(Class<? extends ChannelSelector> clazz,
+                                                  Map<String, String> params) {
+    ChannelSelector selector;
+    try {
+      selector = clazz.newInstance();
+    } catch (Exception ex) {
+      throw new FlumeException("Unable to create channel selector " + clazz.getName(), ex);
+    }
+    Configurables.configure(selector, createContext(params));
+    return selector;
+  }
+
+  /**
+   * Creates a List from a Varargs array.
+   *
+   * @param items The items to add to the list.
+   * @param <T>   The type of objects in the List.
+   * @return a List containing the supplied items.
+   */
+  @SafeVarargs
+  protected final <T> List<T> listOf(T... items) {
+    return Arrays.asList(items);
+  }
+
+  private static Context createContext(Map<String, String> map) {
+    return map != null ? new Context(map) : new Context();
+  }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java
new file mode 100644
index 000000000..eac9a765a
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.flume.node.SimpleMaterializedConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Map;
+
+/**
+ *
+ */
+@Configuration
+public class SpringConfiguration {
+
+  @Autowired
+  public Map<String, Channel> channels;
+
+  @Autowired
+  public Map<String, SinkRunner> sinkRunners;
+
+  @Autowired
+  public Map<String, SourceRunner> sourceRunners;
+
+  @Bean
+  public MaterializedConfiguration configuration() {
+    MaterializedConfiguration config = new SimpleMaterializedConfiguration();
+    for (Map.Entry<String, Channel> entry : channels.entrySet()) {
+      config.addChannel(entry.getKey(), entry.getValue());
+    }
+    for (Map.Entry<String, SinkRunner> entry : sinkRunners.entrySet()) {
+      config.addSinkRunner(entry.getKey(), entry.getValue());
+    }
+    for (Map.Entry<String, SourceRunner> entry : sourceRunners.entrySet()) {
+      config.addSourceRunner(entry.getKey(), entry.getValue());
+    }
+    return config;
+  }
+}
diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java
new file mode 100644
index 000000000..2ddecad29
--- /dev/null
+++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.runner;
+
+import com.google.common.collect.Lists;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.node.Application;
+import org.apache.flume.node.MaterializedConfiguration;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.List;
+
+/**
+ *
+ */
+@Component
+public class SpringFlume {
+  private MaterializedConfiguration materializedConfiguration;
+
+  private final Application application;
+
+  @Autowired
+  public SpringFlume(MaterializedConfiguration configuration) {
+    this.materializedConfiguration = configuration;
+    List<LifecycleAware> components = Lists.newArrayList();
+    application = new Application(components);
+  }
+
+  @PostConstruct
+  public void startUp() {
+    application.start();
+    application.handleConfigurationEvent(materializedConfiguration);
+  }
+
+  @PreDestroy
+  public void shutdown() {
+    application.stop();
+  }
+
+
+}
diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java
new file mode 100644
index 000000000..af2654348
--- /dev/null
+++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot;
+
+import org.apache.flume.node.MaterializedConfiguration;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+/**
+ *
+ */
+@ExtendWith(SpringExtension.class)
+@SpringBootTest
+public class AppTest {
+
+  @Autowired
+  MaterializedConfiguration configuration;
+
+  @Test
+  public void contextLoads() {
+    Assertions.assertThat(configuration).isNotNull();
+    Assertions.assertThat(configuration.getSinkRunners()).isNotNull();
+    Assertions.assertThat(configuration.getSinkRunners().size()).isEqualTo(1);
+  }
+}
diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java
new file mode 100644
index 000000000..cc9534376
--- /dev/null
+++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.spring.boot.config;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+@Configuration
+public class AppConfig extends AbstractFlumeConfiguration {
+
+  @Bean
+  @ConfigurationProperties(prefix = "flume.sources.source1")
+  public Map<String, String> source1Properties() {
+    return new HashMap<>();
+  }
+
+  @Bean
+  @ConfigurationProperties(prefix = "flume.channels.channel1")
+  public Map<String, String> channel1Properties() {
+    return new HashMap<>();
+  }
+
+  @Bean
+  public Channel memoryChannel(Map<String, String> channel1Properties) {
+    return configureChannel("channel1", MemoryChannel.class, channel1Properties);
+  }
+
+  @Bean
+  public SourceRunner seqSource(Channel memoryChannel, Map<String, String> source1Properties) {
+    ChannelSelector selector = new ReplicatingChannelSelector();
+    selector.setChannels(listOf(memoryChannel));
+    return configureSource("source1", SequenceGeneratorSource.class, selector,
+        source1Properties);
+  }
+
+  @Bean
+  public SinkRunner nullSink(Channel memoryChannel) {
+    SinkProcessor sinkProcessor = new DefaultSinkProcessor();
+    Sink sink = configureSink("null", NullSink.class, memoryChannel,null);
+    sinkProcessor.setSinks(listOf(sink));
+    return new SinkRunner(sinkProcessor);
+  }
+}
diff --git a/flume-spring-boot/src/test/resources/application.yml b/flume-spring-boot/src/test/resources/application.yml
new file mode 100644
index 000000000..8e2e2e5ed
--- /dev/null
+++ b/flume-spring-boot/src/test/resources/application.yml
@@ -0,0 +1,8 @@
+
+flume:
+  sources:
+    source1:
+      totalEvents: 10000
+  channels:
+    channel1:
+      capacity: 10000
diff --git a/flume-spring-boot/src/test/resources/log4j2-test.xml b/flume-spring-boot/src/test/resources/log4j2-test.xml
new file mode 100644
index 000000000..d912f3105
--- /dev/null
+++ b/flume-spring-boot/src/test/resources/log4j2-test.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="ERROR">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d [%t] %-5level: %msg%n%throwable" />
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Logger name="org.apache.flume" level="INFO" />
+        <Root level="WARN">
+            <AppenderRef ref="Console" />
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4c657befc..edd28148b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
     <fasterxml.jackson.databind.version>2.13.2.1</fasterxml.jackson.databind.version>
     <fest-reflect.version>1.4</fest-reflect.version>
     <geronimo-jms.version>1.1.1</geronimo-jms.version>
-    <gson.version>2.2.2</gson.version>
+    <gson.version>2.9.1</gson.version>
     <guava.version>18.0</guava.version>
     <guava-old.version>11.0.2</guava-old.version>
     <hadoop.version>2.10.1</hadoop.version>
@@ -149,6 +149,7 @@ limitations under the License.
     <module>flume-shared</module>
     <module>flume-ng-configfilters</module>
     <module>build-support</module>
+    <module>flume-spring-boot</module>
   </modules>
 
   <profiles>